From 83261bf3da73b738532c98f4f5b755cafec25354 Mon Sep 17 00:00:00 2001 From: Vitali Lovich Date: Sat, 11 May 2024 00:44:41 -0700 Subject: [PATCH] Implement very simple advisory lock mechanism Blocking versions can't be implemented because they would require spawning a new thread to avoid jamming up the background thread used for other syscalls (the jamming up can generate a deadlock which is what testing found). The blocking version would require spawning a new thread with a standalone executor but that's horibbly expensive AND runs into https://github.com/DataDog/glommio/issues/448. --- glommio/src/io/dma_file.rs | 429 ++++++++++++++++++++++++++++++++- glommio/src/io/glommio_file.rs | 247 +++++++++++++++++++ glommio/src/io/mod.rs | 4 +- 3 files changed, 676 insertions(+), 4 deletions(-) diff --git a/glommio/src/io/dma_file.rs b/glommio/src/io/dma_file.rs index 92e5ae16a..1906f099a 100644 --- a/glommio/src/io/dma_file.rs +++ b/glommio/src/io/dma_file.rs @@ -24,6 +24,7 @@ use std::{ os::unix::io::{AsRawFd, RawFd}, path::Path, rc::Rc, + sync::{Arc, Weak as AWeak}, }; use super::{ @@ -51,6 +52,75 @@ pub(crate) fn align_down(v: u64, align: u64) -> u64 { v & !(align - 1) } +/// This guard represents that an advisory lock is being held over some file. See for more information about how +/// advisory locks work. This is obtained through [DmaFile::try_lock_exclusive] / [DmaFile::try_lock_shared]. If not +/// explicitly [unlocked](Self::unlock), the advisory lock is released synchronously on Drop. +#[derive(Debug, Clone)] +pub struct AdvisoryLockGuard(Option>); + +impl Drop for AdvisoryLockGuard { + fn drop(&mut self) { + if let Some(mut locked) = self.0.take().and_then(Arc::into_inner) { + unsafe { locked.funlock_immediately() } + } + } +} + +impl AdvisoryLockGuard { + /// Explicitly releases the lock. The file can be successfully closed after calling this. + pub async fn unlock(mut self) -> Result<()> { + let inner = self.0.take().unwrap(); + + match Arc::try_unwrap(inner).map(GlommioFile::from) { + Ok(lock) => unsafe { lock.funlock() }.await, + Err(still_locked) => Err(crate::GlommioError::CanNotBeClosed( + crate::ResourceType::File( + still_locked + .path + .as_ref() + .map_or("path has already been taken!".to_string(), |p| { + p.to_string_lossy().to_string() + }), + ), + "Another clone of this file exists somewhere - cannot close fd", + )), + } + } + + /// Downgrade the lock guard to a weak reference that won't keep the file open. The lock will be released when the + /// the last strong reference is dropped. + pub fn downgrade(&self) -> WeakAdvisoryLockGuard { + WeakAdvisoryLockGuard(Arc::downgrade(self.0.as_ref().unwrap())) + } + + /// Like [DmaFile::try_take_last_clone], this returns `Ok(self)` if and only if it's the last clone of the lock + /// guard in the program (not including weak references). `Err(self)` is returned if other instances exist. + pub fn try_take_last_clone(mut self) -> std::result::Result { + match Arc::try_unwrap(self.0.take().unwrap()) { + Ok(unique) => Ok(Self(Some(Arc::new(unique)))), + Err(shared) => Err(Self(Some(shared))), + } + } +} + +/// This holds a weak reference to an advisory lock. The advisory lock may be dropped while this reference is held +/// in which case this instance will fail to [upgrade](Self::upgrade). +#[derive(Default, Debug, Clone)] +pub struct WeakAdvisoryLockGuard(AWeak); + +impl WeakAdvisoryLockGuard { + /// Returns a weak lock guard not pointing at anything. + pub fn new() -> Self { + Default::default() + } + + /// Upgrades a weak handle to a held advisory lock back to a strong handle. Returns None if the advisory lock was + /// dropped. + pub fn upgrade(&self) -> Option { + self.0.upgrade().map(|guard| AdvisoryLockGuard(Some(guard))) + } +} + #[derive(Debug, Clone)] /// An asynchronously accessed Direct Memory Access (DMA) file. /// @@ -604,6 +674,95 @@ impl DmaFile { self.file.statx().await.map(Into::into) } + /// Tries to acquire a process-wide advisory shared lock on this file instance. If an exclusive advisory lock is + /// already held for the underlying file (by any process), this immediately returns with an error. Similarly, + /// if a shared lock is already held on this file instance, this returns an error. To acquire multiple shared locks + /// in-process on the same file, you must acquire them through files obtained through separate calls to open the + /// file. + /// + /// This is equivalent to [flock](https://linux.die.net/man/2/flock) with `LOCK_SH | LOCK_NB` and requires the + /// file have [read](OpenOptions::read) allowed on it. + /// + /// # File instances and advisory locks. + /// + /// For the purpose of advisory locks, clones and [dupes](Self::dup) count as being the same file instance (even + /// though dupes don't keep the file open). Advisory locks keep the underlying file that they were acquired on open + /// while they are held. + /// + /// # Safety + /// + /// The simple mechanism here only allows one advisory lock holder for the process for a given file instance + /// and that lock must either be [unlocked](AdvisoryLockGuard::unlock). + /// + /// The reason only a single advisory lock can be held on a given file instance at a time is for safety and + /// soundness - the kernel advisory lock is a status bit and any unlock request will unlock the file. That causes + /// some danger if you acquire two shared locks and then unlock one of them - the file is unlocked from then on. Or + /// worse, you unlock one of them, acquire an exclusive lock and then release what you thought was the shared lock + /// but end up releasing the exclusive lock. Similarly, acquiring a shared lock will downgrade if you're holding + /// an exclusive lock. + /// + /// # Usage + /// + /// The purpose of advisory locks is to synchronize access to a resource. A common use-case is that if you're + /// reading to a file, you acquire a shared lock so that any other processes can also continue reading it but you're + /// guaranteed that no one trying to write holding an exclusive lock is admitted. + /// + /// ```no_run + /// use glommio::{ + /// LocalExecutor, + /// io::{ + /// OpenOptions, + /// DmaBuffer, + /// } + /// }; + /// + /// let ex = LocalExecutor::default(); + /// ex.run(async { + /// // A new anonymous file is created within `some_directory/`. + /// let file = OpenOptions::new() + /// .create_new(true) + /// .read(true) + /// .write(true) + /// .tmpfile(true) + /// .dma_open("some_directory") + /// .await + /// .unwrap(); + /// + /// let guard = file.try_lock_shared().await.unwrap(); + /// + /// // Until the guard is dropped, no other process on the file system can acquire an advisory read lock + /// // on the file. + /// + /// // File won't close until you either unlock the guard, drop it, or give it back as part of closing. + /// guard.unlock().await; + /// }); + /// ``` + /// + /// NOTE: + /// [Currently](https://github.com/axboe/liburing/issues/85) flock isn't implemented through io_uring so it requires + /// dispatching to a background thread. + pub async fn try_lock_shared(&self) -> Result { + self.file + .try_lock_shared() + .await + .map(|f| AdvisoryLockGuard(Some(Arc::new(f)))) + } + + /// Tries to acquire an advisory exclusive lock on this file instance. If successful, then the OS advisory lock + /// is acquired. If failed, it's either because the OS advisory lock is held as shared OR because this file instance + /// already handed out a lock (or is in the process of handing one out) - either exclusive or shared. + /// + /// This is equivalent to [flock](https://linux.die.net/man/2/flock) with `LOCK_EX | LOCK_NB` and requires the + /// file have [write](OpenOptions::write) allowed on it. + /// + /// See [try_lock_shared](Self::try_lock_shared) for more details. + pub async fn try_lock_exclusive(&self) -> Result { + self.file + .try_lock_exclusive() + .await + .map(|f| AdvisoryLockGuard(Some(Arc::new(f)))) + } + /// Attempt to confirm no other clones of this file exist. If no clones exist, /// Ok(self) is returned. If clones remain, Err(self) is returned. Do not use /// this unless you are implementing a polling mechanism to determine when it's @@ -625,6 +784,51 @@ impl DmaFile { } } + /// This will release the advisory lock if and only if the guard is the last instance AND this is the last DmaFile + /// instance. On success, self is returned with the guard dropped. Otherwise everything is returned. This will panic + /// if given a guard that's not for this file. + /// + /// NOTE: This will panic if the unlock operation fails. + pub async fn try_take_last_clone_unlocking_guard( + mut self, + guard: AdvisoryLockGuard, + ) -> std::result::Result { + let guard_file = guard.0.as_ref().unwrap(); + assert!( + Arc::ptr_eq( + guard_file.lock_state.as_ref().unwrap(), + self.file.lock_state.as_ref().unwrap() + ), + "The provided guard is for {:?} ({}) which doesn't share the same file entry as {:?} ({})", + guard_file.path, + guard_file.as_raw_fd(), + self.path(), + self.as_raw_fd() + ); + + match guard.try_take_last_clone() { + Ok(mut last_guard) => { + match self + .file + .try_take_last_clone_unlocking_guard( + Arc::into_inner(last_guard.0.take().unwrap()).unwrap(), + ) + .await + { + Ok(took) => { + self.file = took; + Ok(self) + } + Err((not_unique, guard)) => { + self.file = not_unique; + Err((self, AdvisoryLockGuard(Some(Arc::new(guard))))) + } + } + } + Err(still_locked) => Err((self, still_locked)), + } + } + /// Closes this DMA file. pub async fn close(self) -> Result<()> { self.file.close().await @@ -950,7 +1154,7 @@ impl WeakDmaFile { } /// The major ID of the device containing the filesystem where the file resides. - /// The device may be found by issuing a `readlink`` on `/sys/dev/block/:` + /// The device may be found by issuing a `readlink` on `/sys/dev/block/:` pub fn dev_major(&self) -> u32 { self.file.dev_major } @@ -972,8 +1176,11 @@ impl WeakDmaFile { pub(crate) mod test { use super::*; use crate::{ - enclose, test_utils::make_test_directories, ByteSliceMutExt, GlommioError, Latency, - LocalExecutor, ResourceType, Shares, + enclose, + sync::Semaphore, + test_utils::make_test_directories, + timer::{sleep, timeout}, + ByteSliceMutExt, GlommioError, Latency, LocalExecutor, ResourceType, Shares, }; use futures::join; use futures_lite::{stream, StreamExt}; @@ -2208,4 +2415,220 @@ pub(crate) mod test { assert_eq!(weak.strong_count(), 0); }); + + dma_file_test!(advisory_lock_exclusive_wait_for_exclusive, path, _k, { + let file = OpenOptions::new() + .create_new(true) + .read(true) + .write(true) + .dma_open(path.join("testfile")) + .await + .unwrap(); + + let same_path_different_file = OpenOptions::new() + .read(true) + .write(true) + .dma_open(path.join("testfile")) + .await + .unwrap(); + + let duped = file.dup().unwrap(); + + let guard = file.try_lock_exclusive().await.unwrap(); + let waiting_on_exclusive_lock_attempted = std::rc::Rc::new(Semaphore::new(0)); + + let notify_exclusive_lock_attempted = waiting_on_exclusive_lock_attempted.clone(); + let acquire_exclusive = crate::spawn_local(timeout(Duration::from_millis(500), async move { + assert!(same_path_different_file.try_lock_exclusive().await.is_err(), "Shouldn't be able to acquire an exclusive lock on the same path while holding it through a different file"); + notify_exclusive_lock_attempted.signal(1); + loop { + if let Ok(locked) = same_path_different_file.try_lock_exclusive().await { + return Ok(locked) + } + sleep(Duration::from_millis(5)).await; + } + })).detach(); + + duped.try_lock_shared().await.expect_err( + "Shouldn't be able to acquire a lock on a dupe while holding it on the parent", + ); + duped.try_lock_exclusive().await.expect_err( + "Shouldn't be able to acquire a lock on a dupe while holding it on the parent", + ); + + let file = file.try_take_last_clone().expect_err( + "Shouldn't be able to acquire a lock on a dupe while holding it on the parent", + ); + let weak = file.downgrade(); + std::mem::drop(file); + assert!( + weak.upgrade().is_some(), + "Guard should be keeping the file open" + ); + + waiting_on_exclusive_lock_attempted + .acquire(1) + .await + .unwrap(); + guard.unlock().await.unwrap(); + + assert!( + weak.upgrade().is_none(), + "Guard unlocking should have implicitly caused the file to close" + ); + + timeout(Duration::from_millis(15), async move { + acquire_exclusive.await.expect("Task not cancelled") + }) + .await + .expect("Exclusive lock should have been automatically acquired"); + }); + + dma_file_test!(advisory_lock_shared_wait_for_exclusive, path, _k, { + let file = OpenOptions::new() + .create_new(true) + .read(true) + .write(true) + .dma_open(path.join("testfile")) + .await + .unwrap(); + + let same_path_different_file = OpenOptions::new() + .read(true) + .write(true) + .dma_open(path.join("testfile")) + .await + .unwrap(); + + let duped = file.dup().unwrap(); + + let guard = file.try_lock_exclusive().await.unwrap(); + let waiting_on_shared_lock = std::rc::Rc::new(Semaphore::new(0)); + + let notify_waiting_on_shared_lock = waiting_on_shared_lock.clone(); + let acquire_exclusive = crate::spawn_local(timeout(Duration::from_millis(500), async move { + assert!(same_path_different_file.try_lock_shared().await.is_err(), "Shouldn't be able to acquire a shared lock on the same path while holding an exclusive lock through a different file"); + notify_waiting_on_shared_lock.signal(1); + loop { + if let Ok(locked) = same_path_different_file.try_lock_shared().await { + return Ok(locked); + } + sleep(Duration::from_millis(5)).await; + } + })).detach(); + + duped.try_lock_shared().await.expect_err( + "Shouldn't be able to acquire a lock on a dupe while holding it on the parent", + ); + duped.try_lock_exclusive().await.expect_err( + "Shouldn't be able to acquire a lock on a dupe while holding it on the parent", + ); + + guard.unlock().await.unwrap(); + + timeout(Duration::from_millis(15), async move { + acquire_exclusive.await.expect("Task not cancelled") + }) + .await + .expect("Shared lock should have been automatically acquired"); + }); + + dma_file_test!(advisory_lock_shared, path, _k, { + let file = OpenOptions::new() + .create_new(true) + .read(true) + .write(true) + .dma_open(path.join("testfile")) + .await + .unwrap(); + + let same_path_different_file = OpenOptions::new() + .read(true) + .write(true) + .dma_open(path.join("testfile")) + .await + .unwrap(); + + let duped = file.dup().unwrap(); + + let guard = file.try_lock_shared().await.unwrap(); + + duped.try_lock_shared().await.expect_err( + "Shouldn't be able to acquire a lock on a dupe while holding it on the parent", + ); + duped.try_lock_exclusive().await.expect_err( + "Shouldn't be able to acquire a lock on a dupe while holding it on the parent", + ); + + let guard2 = same_path_different_file + .try_lock_shared() + .await + .expect("Should be able to acquire a shared lock through a different open instance"); + + guard.unlock().await.unwrap(); + + duped.try_lock_exclusive().await.expect_err( + "Shouldn't be able to acquire an exclusive lock while holding a shared lock", + ); + + guard2.unlock().await.unwrap(); + + duped + .try_lock_exclusive() + .await + .expect("Locked exclusively once the shared lock was released"); + }); + + dma_file_test!(advisory_weak_lock, path, _k, { + let file = OpenOptions::new() + .create_new(true) + .read(true) + .write(true) + .dma_open(path.join("testfile")) + .await + .unwrap(); + + let exclusive = file.try_lock_exclusive().await.unwrap(); + let exclusive_clone = exclusive.clone(); + let weak_exclusive = exclusive.downgrade(); + + assert!(weak_exclusive.upgrade().is_some()); + + exclusive + .unlock() + .await + .expect_err("Shouldn't be able to unlock while multiple lock references"); + + assert!(weak_exclusive.upgrade().is_some()); + + exclusive_clone.unlock().await.unwrap(); + + assert!(weak_exclusive.upgrade().is_none()); + }); + + dma_file_test!(advisory_take_last_unlock, path, _k, { + let file = OpenOptions::new() + .create_new(true) + .read(true) + .write(true) + .dma_open(path.join("testfile")) + .await + .unwrap(); + + let exclusive = file.try_lock_exclusive().await.unwrap(); + // This drops the guard and unlocks. + let file = file + .try_take_last_clone_unlocking_guard(exclusive) + .await + .unwrap(); + let shared = file.try_lock_shared().await.unwrap(); + let file = file + .try_take_last_clone() + .expect_err("Can't take last clone if guard is holding it"); + let file = file + .try_take_last_clone_unlocking_guard(shared) + .await + .unwrap(); + file.close().await.unwrap(); + }); } diff --git a/glommio/src/io/glommio_file.rs b/glommio/src/io/glommio_file.rs index 4b985600d..264af69ec 100644 --- a/glommio/src/io/glommio_file.rs +++ b/glommio/src/io/glommio_file.rs @@ -27,6 +27,113 @@ pub(super) type Device = u64; pub(super) type Inode = u64; pub(super) type Identity = (Device, Inode); +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum AdvisoryLockState { + Unlocked = 0, + Locking = 1, + Locked = 2, +} + +struct AdvisoryLockStateGuard<'a>(&'a AdvisoryLockStateHolder, bool); + +impl AdvisoryLockStateGuard<'_> { + fn commit(mut self) { + self.1 = false; + self.0.mark_locked(); + } +} + +impl Drop for AdvisoryLockStateGuard<'_> { + fn drop(&mut self) { + if self.1 { + self.0.abort_locking(); + } + } +} + +#[derive(Default)] +pub(crate) struct AdvisoryLockStateHolder { + state: std::sync::atomic::AtomicU8, +} + +impl std::fmt::Debug for AdvisoryLockStateHolder { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_fmt(format_args!("{:?}", self.state(),)) + } +} + +impl AdvisoryLockStateHolder { + fn map_state(state: u8) -> AdvisoryLockState { + match state { + 0 => AdvisoryLockState::Unlocked, + 1 => AdvisoryLockState::Locking, + 2 => AdvisoryLockState::Locked, + rest => { + panic!("Memory corruption resulting in unexpected state value? state = {rest:?}") + } + } + } + + fn state(&self) -> AdvisoryLockState { + Self::map_state(self.state.load(std::sync::atomic::Ordering::Relaxed)) + } + + fn mark_locking<'a>(&'a self, file: &Path) -> Result> { + self.state + .compare_exchange_weak( + AdvisoryLockState::Unlocked as u8, + AdvisoryLockState::Locking as u8, + std::sync::atomic::Ordering::Relaxed, + std::sync::atomic::Ordering::Relaxed, + ) + .map(|_| AdvisoryLockStateGuard(self, true)) + .map_err(|state| match Self::map_state(state) { + AdvisoryLockState::Unlocked => crate::GlommioError::WouldBlock(ResourceType::File( + format!("Failed to acquire lock but current state is unlocked on {file:?}?"), + )), + AdvisoryLockState::Locking => crate::GlommioError::WouldBlock(ResourceType::File( + format!("The lock is already trying to be acquired on {file:?}"), + )), + AdvisoryLockState::Locked => crate::GlommioError::WouldBlock(ResourceType::File( + format!("The lock is already acquired on {file:?}"), + )), + }) + } + + fn abort_locking(&self) { + self.state + .compare_exchange_weak( + AdvisoryLockState::Locking as u8, + AdvisoryLockState::Unlocked as u8, + std::sync::atomic::Ordering::Relaxed, + std::sync::atomic::Ordering::Relaxed, + ) + .expect("Should only be called if the current state is locking"); + } + + fn mark_locked(&self) { + self.state + .compare_exchange_weak( + AdvisoryLockState::Locking as u8, + AdvisoryLockState::Locked as u8, + std::sync::atomic::Ordering::Relaxed, + std::sync::atomic::Ordering::Relaxed, + ) + .expect("Should be in the locking state if we're making the lock as acquired!"); + } + + fn unlock(&self) { + self.state + .compare_exchange_weak( + AdvisoryLockState::Locked as u8, + AdvisoryLockState::Unlocked as u8, + std::sync::atomic::Ordering::Relaxed, + std::sync::atomic::Ordering::Relaxed, + ) + .expect("Unlock called when in unexpected state!"); + } +} + /// A wrapper over `std::fs::File` which carries a path (for better error /// messages) and prints a warning if closed synchronously. /// @@ -35,6 +142,7 @@ pub(super) type Identity = (Device, Inode); #[derive(Debug, Clone)] pub(crate) struct GlommioFile { pub(crate) file: Option>, + pub(crate) lock_state: Option>, // A file can appear in many paths, through renaming and linking. // If we do that, each path should have its own object. This is to // facilitate error displaying. @@ -75,6 +183,7 @@ impl FromRawFd for GlommioFile { unsafe fn from_raw_fd(fd: RawFd) -> Self { GlommioFile { file: Some(Arc::new(fd)), + lock_state: Some(Arc::new(AdvisoryLockStateHolder::default())), path: RefCell::new(None), inode: 0, dev_major: 0, @@ -106,6 +215,7 @@ impl GlommioFile { let mut file = GlommioFile { file: Some(Arc::new(fd as _)), + lock_state: Some(Arc::new(Default::default())), path: RefCell::new(Some(path)), inode: 0, dev_major: 0, @@ -128,6 +238,7 @@ impl GlommioFile { file: Some(Arc::new(nix::unistd::dup( self.file.as_ref().unwrap().as_raw_fd(), )?)), + lock_state: self.lock_state.clone(), path: self.path.clone(), inode: self.inode, dev_major: self.dev_major, @@ -178,8 +289,48 @@ impl GlommioFile { } } + pub(crate) async fn try_take_last_clone_unlocking_guard( + self, + guard: OwnedGlommioFile, + ) -> std::result::Result { + if guard.as_raw_fd() == self.as_raw_fd() { + std::mem::drop(guard); + + match self.try_take_last_clone() { + Ok(took) => { + unsafe { took.funlock().await }.expect("Unlocking advisory lock failed"); + Ok(took) + } + Err(failed) => { + let guard = failed.clone().into(); + Err((failed, guard)) + } + } + } else { + debug_assert!(Arc::ptr_eq( + self.lock_state.as_ref().unwrap(), + guard.lock_state.as_ref().unwrap() + )); + + // Seperate file descriptors but the same file entry. Try taking the fd... + match self.try_take_last_clone() { + Ok(took) => { + let original: GlommioFile = guard.into(); + unsafe { original.funlock().await }.expect("Unlocking advisory lock failed"); + Ok(took) + } + Err(failed) => Err((failed, guard)), + } + } + } + pub(crate) fn discard(mut self) -> (Option, Option) { // Destruct `self` signalling to `Drop` that there is no need to async close. + // Similarly, because we're about to close, we don't need to do anything with the advisory lock - if this is + // the last clone of the file, then it'll unlock implicitly by closing the fd. And if it's not (e.g. a dup + // exists), then we can't safely unlock yet anyway. + self.lock_state.take().unwrap(); + (Arc::into_inner(self.file.take().unwrap()), self.path.take()) } @@ -205,6 +356,67 @@ impl GlommioFile { } } + async fn flock(&self, op: &'static str, flags: libc::c_int) -> Result<()> { + let guard = self + .lock_state + .as_ref() + .unwrap() + .mark_locking(self.path.borrow().as_ref().unwrap())?; + + let res = unsafe { libc::flock(self.as_raw_fd(), flags) }; + if res == -1 { + return Err(GlommioError::create_enhanced( + std::io::Error::last_os_error(), + op, + self.path.borrow().as_ref(), + Some(self.as_raw_fd()), + )); + } + + guard.commit(); + + Ok(()) + } + + pub(crate) async fn try_lock_shared(&self) -> Result { + // NOTE: The try variant could just do the syscall directly instead of dispatching to a blocking thread. + // For consistency though it's implemented the same was. + self.flock("try_lock_shared", libc::LOCK_SH | libc::LOCK_NB) + .await + .map(|()| self.clone().into()) + } + + pub(crate) async fn try_lock_exclusive(&self) -> Result { + // NOTE: The try variant could just do the syscall directly instead of dispatching to a blocking thread. + // For consistency though it's implemented the same way. + self.flock("try_lock_exclusive", libc::LOCK_EX | libc::LOCK_NB) + .await + .map(|()| self.clone().into()) + } + + pub(crate) async unsafe fn funlock(&self) -> Result<()> { + let lock_state = self.lock_state.as_ref().unwrap(); + assert_eq!( + lock_state.state(), + AdvisoryLockState::Locked, + "Not holding the lock!" + ); + + let res = unsafe { libc::flock(self.as_raw_fd(), flags) }; + if res == -1 { + return Err(GlommioError::create_enhanced( + std::io::Error::last_os_error(), + "funlock", + self.path.borrow().as_ref(), + Some(self.as_raw_fd()), + )); + } + + lock_state.unlock(); + + Ok(()) + } + pub(crate) fn with_path(self, path: Option) -> GlommioFile { self.path.replace(path); self @@ -350,6 +562,10 @@ impl GlommioFile { pub(crate) fn downgrade(&self) -> WeakGlommioFile { WeakGlommioFile { fd: self.file.as_ref().map_or(AWeak::new(), Arc::downgrade), + lock_state: self + .lock_state + .as_ref() + .map_or(AWeak::new(), Arc::downgrade), path: self.path.borrow().clone(), inode: self.inode, dev_major: self.dev_major, @@ -366,6 +582,7 @@ impl GlommioFile { #[derive(Debug, Clone)] pub(crate) struct OwnedGlommioFile { pub(crate) fd: Option>, + pub(crate) lock_state: Option>, pub(crate) path: Option, pub(crate) inode: u64, pub(crate) dev_major: u32, @@ -381,6 +598,7 @@ impl OwnedGlommioFile { Ok(Self { fd, + lock_state: self.lock_state.clone(), path: self.path.clone(), inode: self.inode, dev_major: self.dev_major, @@ -395,12 +613,33 @@ impl OwnedGlommioFile { pub(crate) fn downgrade(&self) -> WeakGlommioFile { WeakGlommioFile { fd: self.fd.as_ref().map_or(AWeak::new(), Arc::downgrade), + lock_state: self + .lock_state + .as_ref() + .map_or(AWeak::new(), Arc::downgrade), path: self.path.clone(), inode: self.inode, dev_major: self.dev_major, dev_minor: self.dev_minor, } } + + pub(crate) unsafe fn funlock_immediately(&mut self) { + let advisory_lock = self.lock_state.take().unwrap(); + assert_eq!( + advisory_lock.state(), + AdvisoryLockState::Locked, + "Not holding the lock!" + ); + + nix::fcntl::flock( + self.fd.as_ref().unwrap().as_raw_fd(), + nix::fcntl::FlockArg::Unlock, + ) + .unwrap(); + + advisory_lock.unlock(); + } } impl AsRawFd for OwnedGlommioFile { @@ -423,6 +662,7 @@ impl From for GlommioFile { GlommioFile { file: owned.fd.take(), + lock_state: owned.lock_state.take(), path: RefCell::new(owned.path.take()), inode: owned.inode, dev_major: owned.dev_major, @@ -437,6 +677,7 @@ impl From for OwnedGlommioFile { fn from(mut value: GlommioFile) -> Self { Self { fd: value.file.take(), + lock_state: value.lock_state.take(), path: value.path.borrow_mut().take().map(|p| p.to_path_buf()), inode: value.inode, dev_major: value.dev_major, @@ -448,6 +689,7 @@ impl From for OwnedGlommioFile { #[derive(Default, Debug, Clone)] pub(crate) struct WeakGlommioFile { pub(crate) fd: AWeak, + pub(crate) lock_state: AWeak, pub(crate) path: Option, pub(crate) inode: u64, pub(crate) dev_major: u32, @@ -466,6 +708,11 @@ impl WeakGlommioFile { pub(crate) fn upgrade(&self) -> Option { self.fd.upgrade().map(|fd| OwnedGlommioFile { fd: Some(fd), + lock_state: Some( + self.lock_state + .upgrade() + .expect("If the FD is live then so should the advisory lock state be"), + ), path: self.path.clone(), inode: self.inode, dev_major: self.dev_major, diff --git a/glommio/src/io/mod.rs b/glommio/src/io/mod.rs index 0f49277b5..fc616d829 100644 --- a/glommio/src/io/mod.rs +++ b/glommio/src/io/mod.rs @@ -162,7 +162,9 @@ pub use self::{ }, bulk_io::{IoVec, MergedBufferLimit, ReadAmplificationLimit, ReadManyResult}, directory::Directory, - dma_file::{CloseResult, DmaFile, OwnedDmaFile, WeakDmaFile}, + dma_file::{ + AdvisoryLockGuard, CloseResult, DmaFile, OwnedDmaFile, WeakAdvisoryLockGuard, WeakDmaFile, + }, dma_file_stream::{ DmaStreamReader, DmaStreamReaderBuilder, DmaStreamWriter, DmaStreamWriterBuilder, },