Skip to content

Commit

Permalink
fix(sqlite): have StatementWorker keep a strong ref to `ConnectionH…
Browse files Browse the repository at this point in the history
…andle`

this should prevent the database handle from being finalized before all statement handles
have been finalized
  • Loading branch information
abonander committed Jul 31, 2021
1 parent b4cf1a0 commit ac57b1a
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 10 deletions.
6 changes: 3 additions & 3 deletions sqlx-core/src/sqlite/connection/establish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ pub(crate) async fn establish(options: &SqliteConnectOptions) -> Result<SqliteCo
// https://www.sqlite.org/c3ref/extended_result_codes.html
unsafe {
// NOTE: ignore the failure here
sqlite3_extended_result_codes(handle.0.as_ptr(), 1);
sqlite3_extended_result_codes(handle.as_ptr(), 1);
}

// Configure a busy timeout
Expand All @@ -99,7 +99,7 @@ pub(crate) async fn establish(options: &SqliteConnectOptions) -> Result<SqliteCo
let ms =
i32::try_from(busy_timeout.as_millis()).expect("Given busy timeout value is too big.");

status = unsafe { sqlite3_busy_timeout(handle.0.as_ptr(), ms) };
status = unsafe { sqlite3_busy_timeout(handle.as_ptr(), ms) };

if status != SQLITE_OK {
return Err(Error::Database(Box::new(SqliteError::new(handle.as_ptr()))));
Expand All @@ -109,8 +109,8 @@ pub(crate) async fn establish(options: &SqliteConnectOptions) -> Result<SqliteCo
})?;

Ok(SqliteConnection {
worker: StatementWorker::new(handle.to_ref()),
handle,
worker: StatementWorker::new(),
statements: StatementCache::new(options.statement_cache_capacity),
statement: None,
transaction_depth: 0,
Expand Down
35 changes: 30 additions & 5 deletions sqlx-core/src/sqlite/connection/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,23 @@ use std::ptr::NonNull;
use libsqlite3_sys::{sqlite3, sqlite3_close, SQLITE_OK};

use crate::sqlite::SqliteError;
use std::sync::Arc;

/// Managed handle to the raw SQLite3 database handle.
/// The database handle will be closed when this is dropped.
/// The database handle will be closed when this is dropped and no `ConnectionHandleRef`s exist.
#[derive(Debug)]
pub(crate) struct ConnectionHandle(pub(super) NonNull<sqlite3>);
pub(crate) struct ConnectionHandle(Arc<HandleInner>);

/// A wrapper around `ConnectionHandle` which only exists for a `StatementWorker` to own
/// which prevents the `sqlite3` handle from being finalized while it is running `sqlite3_step()`
/// or `sqlite3_reset()`.
///
/// Note that this does *not* actually give access to the database handle!
pub(crate) struct ConnectionHandleRef(Arc<HandleInner>);

// Wrapper for `*mut sqlite3` which finalizes the handle on-drop.
#[derive(Debug)]
struct HandleInner(NonNull<sqlite3>);

// A SQLite3 handle is safe to send between threads, provided not more than
// one is accessing it at the same time. This is upheld as long as [SQLITE_CONFIG_MULTITHREAD] is
Expand All @@ -20,19 +32,32 @@ pub(crate) struct ConnectionHandle(pub(super) NonNull<sqlite3>);

unsafe impl Send for ConnectionHandle {}

// SAFETY: `Arc<T>` normally only implements `Send` where `T: Sync` because it allows
// concurrent access.
//
// However, in this case we're only using `Arc` to prevent the database handle from being
// finalized while the worker still holds a statement handle; `ConnectionHandleRef` thus
// should *not* actually provide access to the database handle.
unsafe impl Send for ConnectionHandleRef {}

impl ConnectionHandle {
#[inline]
pub(super) unsafe fn new(ptr: *mut sqlite3) -> Self {
Self(NonNull::new_unchecked(ptr))
Self(Arc::new(HandleInner(NonNull::new_unchecked(ptr))))
}

#[inline]
pub(crate) fn as_ptr(&self) -> *mut sqlite3 {
self.0.as_ptr()
self.0 .0.as_ptr()
}

#[inline]
pub(crate) fn to_ref(&self) -> ConnectionHandleRef {
ConnectionHandleRef(Arc::clone(&self.0))
}
}

impl Drop for ConnectionHandle {
impl Drop for HandleInner {
fn drop(&mut self) {
unsafe {
// https://sqlite.org/c3ref/close.html
Expand Down
2 changes: 1 addition & 1 deletion sqlx-core/src/sqlite/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ mod executor;
mod explain;
mod handle;

pub(crate) use handle::ConnectionHandle;
pub(crate) use handle::{ConnectionHandle, ConnectionHandleRef};

/// A connection to a [Sqlite] database.
pub struct SqliteConnection {
Expand Down
7 changes: 6 additions & 1 deletion sqlx-core/src/sqlite/statement/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use futures_channel::oneshot;
use std::sync::{Arc, Weak};
use std::thread;

use crate::sqlite::connection::ConnectionHandleRef;

use libsqlite3_sys::{sqlite3_reset, sqlite3_step, SQLITE_DONE, SQLITE_ROW};
use std::future::Future;

Expand All @@ -31,7 +33,7 @@ enum StatementWorkerCommand {
}

impl StatementWorker {
pub(crate) fn new() -> Self {
pub(crate) fn new(conn: ConnectionHandleRef) -> Self {
let (tx, rx) = unbounded();

thread::spawn(move || {
Expand Down Expand Up @@ -72,6 +74,9 @@ impl StatementWorker {
}
}
}

// SAFETY: we need to make sure a strong ref to `conn` always outlives anything in `rx`
drop(conn);
});

Self { tx }
Expand Down
Binary file modified tests/sqlite/sqlite.db
Binary file not shown.

0 comments on commit ac57b1a

Please sign in to comment.