Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a panic in the worker thread when dropping the connection while SqliteRows still exist #1450

Merged
merged 2 commits into from
Sep 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 5 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions sqlx-core/src/sqlite/connection/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ impl<'c> Executor<'c> for &'c mut SqliteConnection {

Either::Right(()) => {
let (row, weak_values_ref) = SqliteRow::current(
&stmt,
stmt.to_ref(conn.to_ref()),
columns,
column_names
);
Expand Down Expand Up @@ -216,8 +216,11 @@ impl<'c> Executor<'c> for &'c mut SqliteConnection {
Either::Left(_) => (),

Either::Right(()) => {
let (row, weak_values_ref) =
SqliteRow::current(stmt, columns, column_names);
let (row, weak_values_ref) = SqliteRow::current(
stmt.to_ref(self.handle.to_ref()),
columns,
column_names,
);

*last_row_values = Some(weak_values_ref);

Expand Down
1 change: 1 addition & 0 deletions sqlx-core/src/sqlite/connection/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub(crate) struct ConnectionHandle(Arc<HandleInner>);
/// or `sqlite3_reset()`.
///
/// Note that this does *not* actually give access to the database handle!
#[derive(Clone, Debug)]
pub(crate) struct ConnectionHandleRef(Arc<HandleInner>);

// Wrapper for `*mut sqlite3` which finalizes the handle on-drop.
Expand Down
3 changes: 1 addition & 2 deletions sqlx-core/src/sqlite/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ impl Connection for SqliteConnection {

impl Drop for SqliteConnection {
fn drop(&mut self) {
// before the connection handle is dropped,
// we must explicitly drop the statements as the drop-order in a struct is undefined
// explicitly drop statements before the connection handle is dropped
self.statements.clear();
self.statement.take();
}
Expand Down
8 changes: 4 additions & 4 deletions sqlx-core/src/sqlite/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::column::ColumnIndex;
use crate::error::Error;
use crate::ext::ustr::UStr;
use crate::row::Row;
use crate::sqlite::statement::StatementHandle;
use crate::sqlite::statement::{StatementHandle, StatementHandleRef};
use crate::sqlite::{Sqlite, SqliteColumn, SqliteValue, SqliteValueRef};

/// Implementation of [`Row`] for SQLite.
Expand All @@ -23,7 +23,7 @@ pub struct SqliteRow {
// IF the user drops the Row before iterating the stream (so
// nearly all of our internal stream iterators), the executor moves on; otherwise,
// it actually inflates this row with a list of owned sqlite3 values.
pub(crate) statement: Arc<StatementHandle>,
pub(crate) statement: StatementHandleRef,

pub(crate) values: Arc<AtomicPtr<SqliteValue>>,
pub(crate) num_values: usize,
Expand All @@ -48,7 +48,7 @@ impl SqliteRow {
// returns a weak reference to an atomic list where the executor should inflate if its going
// to increment the statement with [step]
pub(crate) fn current(
statement: &Arc<StatementHandle>,
statement: StatementHandleRef,
columns: &Arc<Vec<SqliteColumn>>,
column_names: &Arc<HashMap<UStr, usize>>,
) -> (Self, Weak<AtomicPtr<SqliteValue>>) {
Expand All @@ -57,7 +57,7 @@ impl SqliteRow {
let size = statement.column_count();

let row = Self {
statement: Arc::clone(statement),
statement,
values,
num_values: size,
columns: Arc::clone(columns),
Expand Down
32 changes: 32 additions & 0 deletions sqlx-core/src/sqlite/statement/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,25 @@ use libsqlite3_sys::{
};

use crate::error::{BoxDynError, Error};
use crate::sqlite::connection::ConnectionHandleRef;
use crate::sqlite::type_info::DataType;
use crate::sqlite::{SqliteError, SqliteTypeInfo};
use std::ops::Deref;
use std::sync::Arc;

#[derive(Debug)]
pub(crate) struct StatementHandle(NonNull<sqlite3_stmt>);

// wrapper for `Arc<StatementHandle>` which also holds a reference to the `ConnectionHandle`
#[derive(Clone, Debug)]
pub(crate) struct StatementHandleRef {
// NOTE: the ordering of fields here determines the drop order:
// https://doc.rust-lang.org/reference/destructors.html#destructors
// the statement *must* be dropped before the connection
statement: Arc<StatementHandle>,
connection: ConnectionHandleRef,
}

// access to SQLite3 statement handles are safe to send and share between threads
// as long as the `sqlite3_step` call is serialized.

Expand Down Expand Up @@ -292,7 +305,18 @@ impl StatementHandle {
pub(crate) fn clear_bindings(&self) {
unsafe { sqlite3_clear_bindings(self.0.as_ptr()) };
}

pub(crate) fn to_ref(
self: &Arc<StatementHandle>,
conn: ConnectionHandleRef,
) -> StatementHandleRef {
StatementHandleRef {
statement: Arc::clone(self),
connection: conn,
}
}
}

impl Drop for StatementHandle {
fn drop(&mut self) {
// SAFETY: we have exclusive access to the `StatementHandle` here
Expand All @@ -311,3 +335,11 @@ impl Drop for StatementHandle {
}
}
}

impl Deref for StatementHandleRef {
type Target = StatementHandle;

fn deref(&self) -> &Self::Target {
&self.statement
}
}
2 changes: 1 addition & 1 deletion sqlx-core/src/sqlite/statement/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ mod handle;
mod r#virtual;
mod worker;

pub(crate) use handle::StatementHandle;
pub(crate) use handle::{StatementHandle, StatementHandleRef};
pub(crate) use r#virtual::VirtualStatement;
pub(crate) use worker::StatementWorker;

Expand Down
23 changes: 23 additions & 0 deletions tests/sqlite/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -567,3 +567,26 @@ async fn concurrent_resets_dont_segfault() {

sqlx_rt::sleep(Duration::from_millis(1)).await;
}

// https://github.com/launchbadge/sqlx/issues/1419
// note: this passes before and after the fix; you need to run it with `--nocapture`
// to see the panic from the worker thread, which doesn't happen after the fix
#[sqlx_macros::test]
async fn row_dropped_after_connection_doesnt_panic() {
let mut conn = SqliteConnection::connect(":memory:").await.unwrap();

let books = sqlx::query("SELECT 'hello' AS title")
.fetch_all(&mut conn)
.await
.unwrap();

for book in &books {
// force the row to be inflated
let _title: String = book.get("title");
}

// hold `books` past the lifetime of `conn`
drop(conn);
sqlx_rt::sleep(std::time::Duration::from_secs(1)).await;
drop(books);
}