Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add Connection::shrink_buffers, PoolConnection::close #2379

Merged
merged 1 commit into from
Mar 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions sqlx-core/src/any/connection/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ pub trait AnyConnectionBackend: std::any::Any + Debug + Send + 'static {
Box::pin(async move { Ok(()) })
}

/// Forward to [`Connection::shrink_buffers()`].
///
/// [`Connection::shrink_buffers()`]: method@crate::connection::Connection::shrink_buffers
fn shrink_buffers(&mut self);

#[doc(hidden)]
fn flush(&mut self) -> BoxFuture<'_, crate::Result<()>>;

Expand Down
4 changes: 4 additions & 0 deletions sqlx-core/src/any/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ impl Connection for AnyConnection {
self.backend.clear_cached_statements()
}

fn shrink_buffers(&mut self) {
self.backend.shrink_buffers()
}

#[doc(hidden)]
fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
self.backend.flush()
Expand Down
14 changes: 14 additions & 0 deletions sqlx-core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,20 @@ pub trait Connection: Send {
Box::pin(async move { Ok(()) })
}

/// Restore any buffers in the connection to their default capacity, if possible.
///
/// Sending a large query or receiving a resultset with many columns can cause the connection
/// to allocate additional buffer space to fit the data which is retained afterwards in
/// case it's needed again. This can give the outward appearance of a memory leak, but is
/// in fact the intended behavior.
///
/// Calling this method tells the connection to release that excess memory if it can,
/// though be aware that calling this too often can cause unnecessary thrashing or
/// fragmentation in the global allocator. If there's still data in the connection buffers
/// (unlikely if the last query was run to completion) then it may need to be moved to
/// allow the buffers to shrink.
fn shrink_buffers(&mut self);

#[doc(hidden)]
fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>>;

Expand Down
87 changes: 66 additions & 21 deletions sqlx-core/src/net/socket/buffered.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::net::Socket;
use bytes::BytesMut;
use std::io;
use std::{cmp, io};

use crate::error::Error;

Expand Down Expand Up @@ -46,26 +46,7 @@ impl<S: Socket> BufferedSocket<S> {
}

pub async fn read_buffered(&mut self, len: usize) -> io::Result<BytesMut> {
while self.read_buf.read.len() < len {
self.read_buf.reserve(len);

let read = self.socket.read(&mut self.read_buf.available).await?;

if read == 0 {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
format!(
"expected to read {} bytes, got {} bytes at EOF",
len,
self.read_buf.read.len()
),
));
}

self.read_buf.advance(read);
}

Ok(self.read_buf.drain(len))
self.read_buf.read(len, &mut self.socket).await
}

pub fn write_buffer(&self) -> &WriteBuffer {
Expand Down Expand Up @@ -123,6 +104,12 @@ impl<S: Socket> BufferedSocket<S> {
self.socket.shutdown().await
}

pub fn shrink_buffers(&mut self) {
// Won't drop data still in the buffer.
self.write_buf.shrink();
self.read_buf.shrink();
}

pub fn into_inner(self) -> S {
self.socket
}
Expand Down Expand Up @@ -197,6 +184,22 @@ impl WriteBuffer {
&mut self.buf[self.bytes_flushed..self.bytes_written]
}

pub fn shrink(&mut self) {
if self.bytes_flushed > 0 {
// Move any data that remains to be flushed to the beginning of the buffer,
// if necessary.
self.buf
.copy_within(self.bytes_flushed..self.bytes_written, 0);
self.bytes_written -= self.bytes_flushed;
self.bytes_flushed = 0
}

// Drop excess capacity.
self.buf
.truncate(cmp::max(self.bytes_written, DEFAULT_BUF_SIZE));
self.buf.shrink_to_fit();
}

fn consume(&mut self, amt: usize) {
let new_bytes_flushed = self
.bytes_flushed
Expand All @@ -218,6 +221,31 @@ impl WriteBuffer {
}

impl ReadBuffer {
async fn read(&mut self, len: usize, socket: &mut impl Socket) -> io::Result<BytesMut> {
// Because of how `BytesMut` works, we should only be shifting capacity back and forth
// between `read` and `available` unless we have to read an oversize message.
while self.read.len() < len {
self.reserve(len - self.read.len());

let read = socket.read(&mut self.available).await?;

if read == 0 {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
format!(
"expected to read {} bytes, got {} bytes at EOF",
len,
self.read.len()
),
));
}

self.advance(read);
}

Ok(self.drain(len))
}

fn reserve(&mut self, amt: usize) {
if let Some(additional) = amt.checked_sub(self.available.capacity()) {
self.available.reserve(additional);
Expand All @@ -231,4 +259,21 @@ impl ReadBuffer {
fn drain(&mut self, amt: usize) -> BytesMut {
self.read.split_to(amt)
}

fn shrink(&mut self) {
if self.available.capacity() > DEFAULT_BUF_SIZE {
// `BytesMut` doesn't have a way to shrink its capacity,
// but we only use `available` for spare capacity anyway so we can just replace it.
//
// If `self.read` still contains data on the next call to `advance` then this might
// force a memcpy as they'll no longer be pointing to the same allocation,
// but that's kind of unavoidable.
//
// The `async-std` impl of `Socket` will also need to re-zero the buffer,
// but that's also kind of unavoidable.
//
// We should be warning the user not to call this often.
self.available = BytesMut::with_capacity(DEFAULT_BUF_SIZE);
}
}
}
12 changes: 12 additions & 0 deletions sqlx-core/src/pool/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,18 @@ impl<DB: Database> AsMut<DB::Connection> for PoolConnection<DB> {
}

impl<DB: Database> PoolConnection<DB> {
/// Close this connection, allowing the pool to open a replacement.
///
/// Equivalent to calling [`.detach()`] then [`.close()`], but the connection permit is retained
/// for the duration so that the pool may not exceed `max_connections`.
///
/// [`.detach()`]: PoolConnection::detach
/// [`.close()`]: Connection::close
pub async fn close(mut self) -> Result<(), Error> {
let floating = self.take_live().float(self.pool.clone());
floating.inner.raw.close().await
}

/// Detach this connection from the pool, allowing it to open a replacement.
///
/// Note that if your application uses a single shared pool, this
Expand Down
4 changes: 4 additions & 0 deletions sqlx-mysql/src/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ impl AnyConnectionBackend for MySqlConnection {
MySqlTransactionManager::start_rollback(self)
}

fn shrink_buffers(&mut self) {
Connection::shrink_buffers(self);
}

fn flush(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
Connection::flush(self)
}
Expand Down
4 changes: 4 additions & 0 deletions sqlx-mysql/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,8 @@ impl Connection for MySqlConnection {
{
Transaction::begin(self)
}

fn shrink_buffers(&mut self) {
self.stream.shrink_buffers();
}
}
4 changes: 4 additions & 0 deletions sqlx-postgres/src/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ impl AnyConnectionBackend for PgConnection {
PgTransactionManager::start_rollback(self)
}

fn shrink_buffers(&mut self) {
Connection::shrink_buffers(self);
}

fn flush(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
Connection::flush(self)
}
Expand Down
4 changes: 4 additions & 0 deletions sqlx-postgres/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ impl Connection for PgConnection {
})
}

fn shrink_buffers(&mut self) {
self.stream.shrink_buffers();
}

#[doc(hidden)]
fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
self.wait_until_ready().boxed()
Expand Down
4 changes: 4 additions & 0 deletions sqlx-sqlite/src/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ impl AnyConnectionBackend for SqliteConnection {
SqliteTransactionManager::start_rollback(self)
}

fn shrink_buffers(&mut self) {
// NO-OP.
}

fn flush(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
Connection::flush(self)
}
Expand Down
5 changes: 5 additions & 0 deletions sqlx-sqlite/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@ impl Connection for SqliteConnection {
})
}

#[inline]
fn shrink_buffers(&mut self) {
// No-op.
}

#[doc(hidden)]
fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
// For SQLite, FLUSH does effectively nothing...
Expand Down
31 changes: 31 additions & 0 deletions tests/mysql/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,3 +446,34 @@ async fn it_can_work_with_transactions() -> anyhow::Result<()> {

Ok(())
}

#[sqlx_macros::test]
async fn test_shrink_buffers() -> anyhow::Result<()> {
// We don't really have a good way to test that `.shrink_buffers()` functions as expected
// without exposing a lot of internals, but we can at least be sure it doesn't
// materially affect the operation of the connection.

let mut conn = new::<MySql>().await?;

// The connection buffer is only 8 KiB by default so this should definitely force it to grow.
let data = "This string should be 32 bytes!\n".repeat(1024);
assert_eq!(data.len(), 32 * 1024);

let ret: String = sqlx::query_scalar("SELECT ?")
.bind(&data)
.fetch_one(&mut conn)
.await?;

assert_eq!(ret, data);

conn.shrink_buffers();

let ret: i64 = sqlx::query_scalar("SELECT ?")
.bind(&12345678i64)
.fetch_one(&mut conn)
.await?;

assert_eq!(ret, 12345678i64);

Ok(())
}
30 changes: 30 additions & 0 deletions tests/postgres/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1789,3 +1789,33 @@ async fn test_postgres_bytea_hex_deserialization_errors() -> anyhow::Result<()>
}
Ok(())
}

#[sqlx_macros::test]
async fn test_shrink_buffers() -> anyhow::Result<()> {
// We don't really have a good way to test that `.shrink_buffers()` functions as expected
// without exposing a lot of internals, but we can at least be sure it doesn't
// materially affect the operation of the connection.

let mut conn = new::<Postgres>().await?;

// The connection buffer is only 8 KiB by default so this should definitely force it to grow.
let data = vec![0u8; 32 * 1024];

let ret: Vec<u8> = sqlx::query_scalar("SELECT $1::bytea")
.bind(&data)
.fetch_one(&mut conn)
.await?;

assert_eq!(ret, data);

conn.shrink_buffers();

let ret: i64 = sqlx::query_scalar("SELECT $1::int8")
.bind(&12345678i64)
.fetch_one(&mut conn)
.await?;

assert_eq!(ret, 12345678i64);

Ok(())
}