Skip to content

Commit

Permalink
Replace parking_lot Mutex with std::sync::Mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
nnmm committed Jul 24, 2022
1 parent 214470a commit c654a16
Show file tree
Hide file tree
Showing 10 changed files with 50 additions and 64 deletions.
2 changes: 0 additions & 2 deletions rclrs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ path = "src/lib.rs"
[dependencies]
# Needed for FFI
libc = "0.2.43"
# Provides better concurrency primitives than std
parking_lot = "0.11.2"
# Needed for the Message trait, among others
rosidl_runtime_rs = "0.2.0"
# Needed for clients
Expand Down
6 changes: 2 additions & 4 deletions rclrs/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@ use crate::{RclrsError, ToResult};
use std::ffi::CString;
use std::os::raw::c_char;
use std::string::String;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::vec::Vec;

use parking_lot::Mutex;

impl Drop for rcl_context_t {
fn drop(&mut self) {
unsafe {
Expand Down Expand Up @@ -112,7 +110,7 @@ impl Context {
pub fn ok(&self) -> bool {
// This will currently always return true, but once we have a signal handler, the signal
// handler could call `rcl_shutdown()`, hence making the context invalid.
let rcl_context = &mut *self.rcl_context_mtx.lock();
let rcl_context = &mut *self.rcl_context_mtx.lock().unwrap();
// SAFETY: No preconditions for this function.
unsafe { rcl_context_is_valid(rcl_context) }
}
Expand Down
6 changes: 4 additions & 2 deletions rclrs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,12 @@ pub fn spin(node: &Node) -> Result<(), RclrsError> {
// The context_is_valid functions exists only to abstract away ROS distro differences
#[cfg(ros_distro = "foxy")]
// SAFETY: No preconditions for this function.
let context_is_valid = || unsafe { rcl_context_is_valid(&mut *node.rcl_context_mtx.lock()) };
let context_is_valid =
|| unsafe { rcl_context_is_valid(&mut *node.rcl_context_mtx.lock().unwrap()) };
#[cfg(not(ros_distro = "foxy"))]
// SAFETY: No preconditions for this function.
let context_is_valid = || unsafe { rcl_context_is_valid(&*node.rcl_context_mtx.lock()) };
let context_is_valid =
|| unsafe { rcl_context_is_valid(&*node.rcl_context_mtx.lock().unwrap()) };

while context_is_valid() {
match spin_once(node, None) {
Expand Down
8 changes: 3 additions & 5 deletions rclrs/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@ use crate::{Context, ParameterOverrideMap, QoSProfile, RclrsError, ToResult};
use std::cmp::PartialEq;
use std::ffi::CStr;
use std::fmt;
use std::sync::{Arc, Weak};
use std::sync::{Arc, Mutex, Weak};
use std::vec::Vec;

use libc::c_char;
use parking_lot::Mutex;

use rosidl_runtime_rs::Message;

impl Drop for rcl_node_t {
Expand Down Expand Up @@ -177,7 +175,7 @@ impl Node {
&self,
getter: unsafe extern "C" fn(*const rcl_node_t) -> *const c_char,
) -> String {
unsafe { call_string_getter_with_handle(&*self.rcl_node_mtx.lock(), getter) }
unsafe { call_string_getter_with_handle(&*self.rcl_node_mtx.lock().unwrap(), getter) }
}

/// Creates a [`Client`][1].
Expand Down Expand Up @@ -291,7 +289,7 @@ impl Node {
// add description about this function is for getting actual domain_id
// and about override of domain_id via node option
pub fn domain_id(&self) -> usize {
let rcl_node = &*self.rcl_node_mtx.lock();
let rcl_node = &*self.rcl_node_mtx.lock().unwrap();
let mut domain_id: usize = 0;
let ret = unsafe {
// SAFETY: No preconditions for this function.
Expand Down
6 changes: 2 additions & 4 deletions rclrs/src/node/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ use crate::{
};

use std::ffi::CString;
use std::sync::Arc;

use parking_lot::Mutex;
use std::sync::{Arc, Mutex};

/// A builder for creating a [`Node`][1].
///
Expand Down Expand Up @@ -245,7 +243,7 @@ impl NodeBuilder {
s: self.namespace.clone(),
})?;
let rcl_node_options = self.create_rcl_node_options()?;
let rcl_context = &mut *self.context.lock();
let rcl_context = &mut *self.context.lock().unwrap();

// SAFETY: Getting a zero-initialized value is always safe.
let mut rcl_node = unsafe { rcl_get_zero_initialized_node() };
Expand Down
21 changes: 10 additions & 11 deletions rclrs/src/node/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@ use std::boxed::Box;
use std::collections::HashMap;
use std::ffi::CString;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::sync::{Arc, Mutex, MutexGuard};

use crate::error::{RclReturnCode, ToResult};
use crate::MessageCow;
use crate::Node;
use crate::{rcl_bindings::*, RclrsError};

use parking_lot::{Mutex, MutexGuard};
use rosidl_runtime_rs::Message;

// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
Expand All @@ -26,17 +25,17 @@ pub struct ClientHandle {

impl ClientHandle {
pub(crate) fn lock(&self) -> MutexGuard<rcl_client_t> {
self.rcl_client_mtx.lock()
self.rcl_client_mtx.lock().unwrap()
}
}

impl Drop for ClientHandle {
fn drop(&mut self) {
let handle = self.rcl_client_mtx.get_mut();
let rcl_node_mtx = &mut *self.rcl_node_mtx.lock();
let rcl_client = self.rcl_client_mtx.get_mut().unwrap();
let rcl_node_mtx = &mut *self.rcl_node_mtx.lock().unwrap();
// SAFETY: No preconditions for this function
unsafe {
rcl_client_fini(handle, rcl_node_mtx);
rcl_client_fini(rcl_client, rcl_node_mtx);
}
}
}
Expand Down Expand Up @@ -87,7 +86,7 @@ where
err,
s: topic.into(),
})?;
let rcl_node = { &mut *node.rcl_node_mtx.lock() };
let rcl_node = { &mut *node.rcl_node_mtx.lock().unwrap() };

// SAFETY: No preconditions for this function.
let client_options = unsafe { rcl_client_get_default_options() };
Expand Down Expand Up @@ -153,7 +152,7 @@ where
)
}
.ok()?;
let requests = &mut *self.requests.lock();
let requests = &mut *self.requests.lock().unwrap();
requests.insert(sequence_number, Box::new(callback));
Ok(())
}
Expand Down Expand Up @@ -189,7 +188,7 @@ where
}
.ok()?;
let (tx, rx) = oneshot::channel::<T::Response>();
self.futures.lock().insert(sequence_number, tx);
self.futures.lock().unwrap().insert(sequence_number, tx);
// It is safe to call unwrap() here since the `Canceled` error will only happen when the
// `Sender` is dropped
// https://docs.rs/futures/latest/futures/channel/oneshot/struct.Canceled.html
Expand Down Expand Up @@ -261,8 +260,8 @@ where
}
Err(e) => return Err(e),
};
let requests = &mut *self.requests.lock();
let futures = &mut *self.futures.lock();
let requests = &mut *self.requests.lock().unwrap();
let futures = &mut *self.futures.lock().unwrap();
if let Some(callback) = requests.remove(&req_id.sequence_number) {
callback(res);
} else if let Some(future) = futures.remove(&req_id.sequence_number) {
Expand Down
15 changes: 7 additions & 8 deletions rclrs/src/node/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ use std::borrow::Cow;
use std::ffi::CStr;
use std::ffi::CString;
use std::marker::PhantomData;
use std::sync::Arc;

use parking_lot::Mutex;
use std::sync::{Arc, Mutex};

use rosidl_runtime_rs::{Message, RmwMessage};

Expand Down Expand Up @@ -44,8 +42,8 @@ where
unsafe {
// SAFETY: No preconditions for this function (besides the arguments being valid).
rcl_publisher_fini(
self.rcl_publisher_mtx.get_mut(),
&mut *self.rcl_node_mtx.lock(),
self.rcl_publisher_mtx.get_mut().unwrap(),
&mut *self.rcl_node_mtx.lock().unwrap(),
);
}
}
Expand All @@ -70,7 +68,7 @@ where
err,
s: topic.into(),
})?;
let rcl_node = &mut *node.rcl_node_mtx.lock();
let rcl_node = &mut *node.rcl_node_mtx.lock().unwrap();

// SAFETY: No preconditions for this function.
let mut publisher_options = unsafe { rcl_publisher_get_default_options() };
Expand Down Expand Up @@ -106,7 +104,8 @@ where
// SAFETY: No preconditions for the functions called.
// The unsafe variables created get converted to safe types before being returned
unsafe {
let raw_topic_pointer = rcl_publisher_get_topic_name(&*self.rcl_publisher_mtx.lock());
let raw_topic_pointer =
rcl_publisher_get_topic_name(&*self.rcl_publisher_mtx.lock().unwrap());
CStr::from_ptr(raw_topic_pointer)
.to_string_lossy()
.into_owned()
Expand All @@ -131,7 +130,7 @@ where
/// [1]: https://github.com/ros2/ros2/issues/255
pub fn publish<'a, M: MessageCow<'a, T>>(&self, message: M) -> Result<(), RclrsError> {
let rmw_message = T::into_rmw_message(message.into_cow());
let rcl_publisher = &mut *self.rcl_publisher_mtx.lock();
let rcl_publisher = &mut *self.rcl_publisher_mtx.lock().unwrap();
unsafe {
// SAFETY: The message type is guaranteed to match the publisher type by the type system.
// The message does not need to be valid beyond the duration of this function call.
Expand Down
30 changes: 14 additions & 16 deletions rclrs/src/node/service.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::boxed::Box;
use std::ffi::CString;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::sync::{Arc, Mutex, MutexGuard};

use crate::error::{RclReturnCode, ToResult};
use crate::Node;
Expand All @@ -11,32 +11,30 @@ use rosidl_runtime_rs::Message;

use crate::node::publisher::MessageCow;

use parking_lot::{Mutex, MutexGuard};

// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
// they are running in. Therefore, this type can be safely sent to another thread.
unsafe impl Send for rcl_service_t {}

/// Internal struct used by services.
pub struct ServiceHandle {
handle: Mutex<rcl_service_t>,
node_handle: Arc<Mutex<rcl_node_t>>,
rcl_service_mtx: Mutex<rcl_service_t>,
rcl_node_mtx: Arc<Mutex<rcl_node_t>>,
pub(crate) in_use_by_wait_set: Arc<AtomicBool>,
}

impl ServiceHandle {
pub(crate) fn lock(&self) -> MutexGuard<rcl_service_t> {
self.handle.lock()
self.rcl_service_mtx.lock().unwrap()
}
}

impl Drop for ServiceHandle {
fn drop(&mut self) {
let handle = self.handle.get_mut();
let node_handle = &mut *self.node_handle.lock();
let rcl_service = self.rcl_service_mtx.get_mut().unwrap();
let rcl_node = &mut *self.rcl_node_mtx.lock().unwrap();
// SAFETY: No preconditions for this function
unsafe {
rcl_service_fini(handle, node_handle);
rcl_service_fini(rcl_service, rcl_node);
}
}
}
Expand Down Expand Up @@ -80,14 +78,14 @@ where
F: Fn(&rmw_request_id_t, T::Request) -> T::Response + 'static + Send,
{
// SAFETY: Getting a zero-initialized value is always safe.
let mut service_handle = unsafe { rcl_get_zero_initialized_service() };
let mut rcl_service = unsafe { rcl_get_zero_initialized_service() };
let type_support = <T as rosidl_runtime_rs::Service>::get_type_support()
as *const rosidl_service_type_support_t;
let topic_c_string = CString::new(topic).map_err(|err| RclrsError::StringContainsNul {
err,
s: topic.into(),
})?;
let node_handle = &mut *node.rcl_node_mtx.lock();
let rcl_node = &mut *node.rcl_node_mtx.lock().unwrap();

// SAFETY: No preconditions for this function.
let service_options = unsafe { rcl_service_get_default_options() };
Expand All @@ -98,8 +96,8 @@ where
// The topic name and the options are copied by this function, so they can be dropped
// afterwards.
rcl_service_init(
&mut service_handle as *mut _,
node_handle as *mut _,
&mut rcl_service as *mut _,
rcl_node as *mut _,
type_support,
topic_c_string.as_ptr(),
&service_options as *const _,
Expand All @@ -108,8 +106,8 @@ where
}

let handle = Arc::new(ServiceHandle {
handle: Mutex::new(service_handle),
node_handle: node.rcl_node_mtx.clone(),
rcl_service_mtx: Mutex::new(rcl_service),
rcl_node_mtx: node.rcl_node_mtx.clone(),
in_use_by_wait_set: Arc::new(AtomicBool::new(false)),
});

Expand Down Expand Up @@ -184,7 +182,7 @@ where
}
Err(e) => return Err(e),
};
let res = (*self.callback.lock())(&req_id, req);
let res = (*self.callback.lock().unwrap())(&req_id, req);
let rmw_message = <T::Response as Message>::into_rmw_message(res.into_cow());
let handle = &*self.handle.lock();
unsafe {
Expand Down
14 changes: 6 additions & 8 deletions rclrs/src/node/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,10 @@ use std::ffi::CStr;
use std::ffi::CString;
use std::marker::PhantomData;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::sync::{Arc, Mutex, MutexGuard};

use rosidl_runtime_rs::{Message, RmwMessage};

use parking_lot::{Mutex, MutexGuard};

// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
// they are running in. Therefore, this type can be safely sent to another thread.
unsafe impl Send for rcl_subscription_t {}
Expand All @@ -27,14 +25,14 @@ pub struct SubscriptionHandle {

impl SubscriptionHandle {
pub(crate) fn lock(&self) -> MutexGuard<rcl_subscription_t> {
self.rcl_subscription_mtx.lock()
self.rcl_subscription_mtx.lock().unwrap()
}
}

impl Drop for SubscriptionHandle {
fn drop(&mut self) {
let rcl_subscription = self.rcl_subscription_mtx.get_mut();
let rcl_node = &mut *self.rcl_node_mtx.lock();
let rcl_subscription = self.rcl_subscription_mtx.get_mut().unwrap();
let rcl_node = &mut *self.rcl_node_mtx.lock().unwrap();
// SAFETY: No preconditions for this function (besides the arguments being valid).
unsafe {
rcl_subscription_fini(rcl_subscription, rcl_node);
Expand Down Expand Up @@ -99,7 +97,7 @@ where
err,
s: topic.into(),
})?;
let rcl_node = &mut *node.rcl_node_mtx.lock();
let rcl_node = &mut *node.rcl_node_mtx.lock().unwrap();

// SAFETY: No preconditions for this function.
let mut subscription_options = unsafe { rcl_subscription_get_default_options() };
Expand Down Expand Up @@ -210,7 +208,7 @@ where
}
Err(e) => return Err(e),
};
(*self.callback.lock())(msg);
(*self.callback.lock().unwrap())(msg);
Ok(())
}
}
6 changes: 2 additions & 4 deletions rclrs/src/wait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@ use crate::error::{to_rclrs_result, RclReturnCode, RclrsError, ToResult};
use crate::rcl_bindings::*;
use crate::{ClientBase, Context, ServiceBase, SubscriptionBase};

use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::vec::Vec;

use parking_lot::Mutex;

mod exclusivity_guard;
use exclusivity_guard::*;

Expand Down Expand Up @@ -88,7 +86,7 @@ impl WaitSet {
number_of_clients,
number_of_services,
number_of_events,
&mut *context.rcl_context_mtx.lock(),
&mut *context.rcl_context_mtx.lock().unwrap(),
rcutils_get_default_allocator(),
)
.ok()?;
Expand Down

0 comments on commit c654a16

Please sign in to comment.