Skip to content

Commit

Permalink
WIP feat: reintroduce Pool
Browse files Browse the repository at this point in the history
Signed-off-by: Austin Bonander <austin@launchbadge.com>
  • Loading branch information
abonander committed Apr 29, 2021
1 parent e7664d8 commit d1b1b31
Show file tree
Hide file tree
Showing 12 changed files with 1,320 additions and 5 deletions.
27 changes: 27 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 30 additions & 0 deletions script/enforce-new-mod-style.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#!/usr/bin/env bash

# This script scans the project for `mod.rs` files and exits with a nonzero code if it finds any.
#
# You can also call it with `--fix` to replace any `mod.rs` files with their 2018 edition equivalents.
# The new files will be staged for commit for convenience.

FILES=$(find ./ -name mod.rs -print)

if [[ -z $FILES ]]; then
exit 0
fi

if [ "$1" != "--fix" ]; then
echo 'This project uses the Rust 2018 module style. mod.rs files are forbidden.'
echo "Execute \`$0 --fix\` to replace these with their 2018 equivalents and stage for commit."
echo 'Found mod.rs files:'
echo "$FILES"
exit 1
fi

echo 'Fixing Rust 2018 Module Style'

while read -r file; do
dest="$(dirname $file).rs"
echo "$file -> $dest"
mv $file $dest
git add $dest
done <<< $FILES

27 changes: 22 additions & 5 deletions sqlx-core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ pub enum Error {
/// The database URL is malformed or contains invalid or unsupported
/// values for one or more options; a value of [`ConnectOptions`] failed
/// to be parsed.
ConnectOptions { message: Cow<'static, str>, source: Option<Box<dyn StdError + Send + Sync>> },
ConnectOptions {
message: Cow<'static, str>,
source: Option<Box<dyn StdError + Send + Sync>>,
},

/// An error that was returned from the database, normally from the
/// execution of a SQL command.
Expand Down Expand Up @@ -65,26 +68,40 @@ pub enum Error {
///
Closed,

AcquireTimedOut,

/// An error occurred decoding a SQL value from the database.
Decode(DecodeError),

/// An error occurred encoding a value to be sent to the database.
Encode(EncodeError),

/// An attempt to access a column by index past the end of the row.
ColumnIndexOutOfBounds { index: usize, len: usize },
ColumnIndexOutOfBounds {
index: usize,
len: usize,
},

/// An attempt to access a column by name where no such column is
/// present in the row.
ColumnNotFound { name: Box<str> },
ColumnNotFound {
name: Box<str>,
},

/// An error occurred decoding a SQL value of a specific column
/// from the database.
ColumnDecode { column_index: usize, column_name: Box<str>, source: DecodeError },
ColumnDecode {
column_index: usize,
column_name: Box<str>,
source: DecodeError,
},

/// An error occurred encoding a value for a specific parameter to
/// be sent to the database.
ParameterEncode { parameter: Either<usize, Box<str>>, source: EncodeError },
ParameterEncode {
parameter: Either<usize, Box<str>>,
source: EncodeError,
},
}

impl Error {
Expand Down
11 changes: 11 additions & 0 deletions sqlx-core/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ mod tokio_;
pub use actix_::Actix;
#[cfg(feature = "async-std")]
pub use async_std_::AsyncStd;
use std::future::Future;
use std::time::Duration;
#[cfg(feature = "tokio")]
pub use tokio_::Tokio;

Expand Down Expand Up @@ -82,6 +84,15 @@ pub trait Runtime: 'static + Send + Sync + Sized + Debug {
fn connect_unix_async(path: &Path) -> BoxFuture<'_, io::Result<Self::UnixStream>>
where
Self: Async;

#[doc(hidden)]
#[cfg(all(unix, feature = "async"))]
fn timeout_async<'a, F: Future + Send + 'a>(
fut: F,
timeout: Duration,
) -> BoxFuture<'a, Option<F::Output>>
where
Self: Async;
}

/// Marks a [`Runtime`] as being capable of handling asynchronous execution.
Expand Down
10 changes: 10 additions & 0 deletions sqlx-core/src/runtime/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use futures_util::{AsyncReadExt, AsyncWriteExt, FutureExt, TryFutureExt};

use crate::io::Stream;
use crate::{Async, Runtime};
use std::future::Future;
use std::time::{Duration, Instant};

/// Provides [`Runtime`] for [**Tokio**](https://tokio.rs). Supports only non-blocking operation.
///
Expand Down Expand Up @@ -55,6 +57,14 @@ impl Runtime for Tokio {
fn connect_unix_async(path: &Path) -> BoxFuture<'_, io::Result<Self::UnixStream>> {
UnixStream::connect(path).map_ok(Compat::new).boxed()
}

#[doc(hidden)]
fn timeout_async<'a, F: Future + Send + 'a>(
fut: F,
timeout: Duration,
) -> BoxFuture<'a, Option<F::Output>> {
Box::pin(_tokio::time::timeout(timeout.into(), fut).map(Result::ok))
}
}

impl Async for Tokio {}
Expand Down
10 changes: 10 additions & 0 deletions sqlx/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ async-std = ["async", "sqlx-core/async-std"]
actix = ["async", "sqlx-core/actix"]
tokio = ["async", "sqlx-core/tokio"]

# Connection Pool
pool = ["crossbeam-queue", "parking_lot"]

# MySQL
mysql = ["sqlx-mysql"]
mysql-async = ["async", "mysql", "sqlx-mysql/async"]
Expand All @@ -38,8 +41,15 @@ postgres = ["sqlx-postgres"]
postgres-async = ["async", "postgres", "sqlx-postgres/async"]
postgres-blocking = ["blocking", "postgres", "sqlx-postgres/blocking"]


[dependencies]
sqlx-core = { version = "0.6.0-pre", path = "../sqlx-core" }
sqlx-mysql = { version = "0.6.0-pre", path = "../sqlx-mysql", optional = true }
sqlx-postgres = { version = "0.6.0-pre", path = "../sqlx-postgres", optional = true }
futures-util = { version = "0.3", optional = true, features = ["io"] }

crossbeam-queue = { version = "0.3.1", optional = true }
parking_lot = { version = "0.11", optional = true }

[dev-dependencies]
futures = "0.3.5"
3 changes: 3 additions & 0 deletions sqlx/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@
#[cfg(feature = "blocking")]
pub mod blocking;

#[cfg(feature = "pool")]
pub mod pool;

mod query;
mod query_as;
mod runtime;
Expand Down
79 changes: 79 additions & 0 deletions sqlx/src/pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use crate::pool::connection::{Idle, Pooled};
use crate::pool::options::PoolOptions;
use crate::pool::shared::{SharedPool, TryAcquireResult};
use crate::pool::wait_list::WaitList;
use crate::{Connect, Connection, DefaultRuntime, Runtime};
use crossbeam_queue::ArrayQueue;
use std::sync::atomic::AtomicU32;
use std::sync::Arc;
use std::time::{Duration, Instant};

mod connection;
mod options;
mod shared;
mod wait_list;

pub struct Pool<Rt: Runtime, C: Connection<Rt>> {
shared: Arc<SharedPool<Rt, C>>,
}

impl<Rt: Runtime, C: Connection<Rt>> Pool<Rt, C> {
pub fn new(uri: &str) -> crate::Result<Self> {
Self::builder().build(uri)
}

pub fn new_with(connect_options: <C as Connect<Rt>>::Options) -> Self {
Self::builder().build_with(connect_options)
}

pub fn builder() -> PoolOptions<Rt, C> {
PoolOptions::new()
}
}

#[cfg(feature = "async")]
impl<Rt: crate::Async, C: Connection<Rt>> Pool<Rt, C> {
pub async fn connect(uri: &str) -> crate::Result<Self> {
Self::builder().connect(uri).await
}

pub async fn connect_with(connect_options: <C as Connect<Rt>>::Options) -> crate::Result<Self> {
Self::builder().connect_with(connect_options).await
}

pub async fn acquire(&self) -> crate::Result<Pooled<Rt, C>> {
if let Some(timeout) = self.shared.pool_options.acquire_timeout {
self.acquire_timeout(timeout).await
} else {
self.acquire_inner().await
}
}

pub async fn acquire_timeout(&self, timeout: Duration) -> crate::Result<Pooled<Rt, C>> {
Rt::timeout_async(timeout, self.acquire_inner())
.await
.ok_or(crate::Error::AcquireTimedOut)?
}

async fn acquire_inner(&self) -> crate::Result<Pooled<Rt, C>> {
let mut acquire_permit = None;

loop {
match self.shared.try_acquire(acquire_permit.take()) {
TryAcquireResult::Acquired(mut conn) => {
match self.shared.on_acquire_async(&mut conn) {
Ok(()) => return Ok(conn.attach(&self.shared)),
Err(e) => {
log::info!("error from before_acquire: {:?}", e);
}
}
}
TryAcquireResult::Connect(permit) => self.shared.connect_async(permit).await,
TryAcquireResult::Wait => {
acquire_permit = Some(self.shared.wait_async().await);
}
TryAcquireResult::PoolClosed => Err(crate::Error::Closed),
}
}
}
}
Loading

0 comments on commit d1b1b31

Please sign in to comment.