Skip to content

Commit

Permalink
WIP feat: reintroduce Pool
Browse files Browse the repository at this point in the history
Signed-off-by: Austin Bonander <austin@launchbadge.com>
  • Loading branch information
abonander committed Apr 29, 2021
1 parent e7664d8 commit 95d39be
Show file tree
Hide file tree
Showing 11 changed files with 1,278 additions and 0 deletions.
27 changes: 27 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 30 additions & 0 deletions script/enforce-new-mod-style.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#!/usr/bin/env bash

# This script scans the project for `mod.rs` files and exits with a nonzero code if it finds any.
#
# You can also call it with `--fix` to replace any `mod.rs` files with their 2018 edition equivalents.
# The new files will be staged for commit for convenience.

FILES=$(find ./ -name mod.rs -print)

if [[ -z $FILES ]]; then
exit 0
fi

if [ "$1" != "--fix" ]; then
echo 'This project uses the Rust 2018 module style. mod.rs files are forbidden.'
echo "Execute \`$0 --fix\` to replace these with their 2018 equivalents and stage for commit."
echo 'Found mod.rs files:'
echo "$FILES"
exit 1
fi

echo 'Fixing Rust 2018 Module Style'

while read -r file; do
dest="$(dirname $file).rs"
echo "$file -> $dest"
mv $file $dest
git add $dest
done <<< $FILES

11 changes: 11 additions & 0 deletions sqlx-core/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ mod tokio_;
pub use actix_::Actix;
#[cfg(feature = "async-std")]
pub use async_std_::AsyncStd;
use std::future::Future;
use std::time::Instant;
#[cfg(feature = "tokio")]
pub use tokio_::Tokio;

Expand Down Expand Up @@ -82,6 +84,15 @@ pub trait Runtime: 'static + Send + Sync + Sized + Debug {
fn connect_unix_async(path: &Path) -> BoxFuture<'_, io::Result<Self::UnixStream>>
where
Self: Async;

#[doc(hidden)]
#[cfg(all(unix, feature = "async"))]
fn timeout_at_async<'a, F: Future + Send + 'a>(
fut: F,
deadline: Instant,
) -> BoxFuture<'a, Option<F::Output>>
where
Self: Async;
}

/// Marks a [`Runtime`] as being capable of handling asynchronous execution.
Expand Down
10 changes: 10 additions & 0 deletions sqlx-core/src/runtime/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use futures_util::{AsyncReadExt, AsyncWriteExt, FutureExt, TryFutureExt};

use crate::io::Stream;
use crate::{Async, Runtime};
use std::future::Future;
use std::time::Instant;

/// Provides [`Runtime`] for [**Tokio**](https://tokio.rs). Supports only non-blocking operation.
///
Expand Down Expand Up @@ -55,6 +57,14 @@ impl Runtime for Tokio {
fn connect_unix_async(path: &Path) -> BoxFuture<'_, io::Result<Self::UnixStream>> {
UnixStream::connect(path).map_ok(Compat::new).boxed()
}

#[doc(hidden)]
fn timeout_at_async<'a, F: Future + Send + 'a>(
fut: F,
deadline: Instant,
) -> BoxFuture<'a, Option<F::Output>> {
Box::pin(_tokio::time::timeout_at(deadline.into(), fut).map(Result::ok))
}
}

impl Async for Tokio {}
Expand Down
10 changes: 10 additions & 0 deletions sqlx/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ async-std = ["async", "sqlx-core/async-std"]
actix = ["async", "sqlx-core/actix"]
tokio = ["async", "sqlx-core/tokio"]

# Connection Pool
pool = ["crossbeam-queue", "parking_lot"]

# MySQL
mysql = ["sqlx-mysql"]
mysql-async = ["async", "mysql", "sqlx-mysql/async"]
Expand All @@ -38,8 +41,15 @@ postgres = ["sqlx-postgres"]
postgres-async = ["async", "postgres", "sqlx-postgres/async"]
postgres-blocking = ["blocking", "postgres", "sqlx-postgres/blocking"]


[dependencies]
sqlx-core = { version = "0.6.0-pre", path = "../sqlx-core" }
sqlx-mysql = { version = "0.6.0-pre", path = "../sqlx-mysql", optional = true }
sqlx-postgres = { version = "0.6.0-pre", path = "../sqlx-postgres", optional = true }
futures-util = { version = "0.3", optional = true, features = ["io"] }

crossbeam-queue = { version = "0.3.1", optional = true }
parking_lot = { version = "0.11", optional = true }

[dev-dependencies]
futures = "0.3.5"
3 changes: 3 additions & 0 deletions sqlx/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@
#[cfg(feature = "blocking")]
pub mod blocking;

#[cfg(feature = "pool")]
pub mod pool;

mod query;
mod query_as;
mod runtime;
Expand Down
65 changes: 65 additions & 0 deletions sqlx/src/pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use crate::pool::connection::{Idle, Pooled};
use crate::pool::options::PoolOptions;
use crate::pool::shared::{SharedPool, TryAcquireResult};
use crate::pool::wait_list::WaitList;
use crate::{Connect, Connection, DefaultRuntime, Runtime};
use crossbeam_queue::ArrayQueue;
use std::sync::atomic::AtomicU32;
use std::sync::Arc;
use std::time::Instant;

mod connection;
mod options;
mod shared;
mod wait_list;

pub struct Pool<Rt: Runtime, C: Connection<Rt>> {
shared: Arc<SharedPool<Rt, C>>,
}

impl<Rt: Runtime, C: Connection<Rt>> Pool<Rt, C> {
pub fn new(uri: &str) -> crate::Result<Self> {
Self::builder().build(uri)
}

pub fn new_with(connect_options: <C as Connect<Rt>>::Options) -> Self {
Self::builder().build_with(connect_options)
}

pub fn builder() -> PoolOptions<Rt, C> {
PoolOptions::new()
}
}

#[cfg(feature = "async")]
impl<Rt: crate::Async, C: Connection<Rt>> Pool<Rt, C> {
pub async fn connect(uri: &str) -> crate::Result<Self> {
Self::builder().connect(uri).await
}

pub async fn connect_with(connect_options: <C as Connect<Rt>>::Options) -> crate::Result<Self> {
Self::builder().connect_with(connect_options).await
}

pub async fn acquire(&self) -> crate::Result<Pooled<Rt, C>> {}

async fn acquire_inner(&self, deadline: Option<Instant>) -> crate::Result<Pooled<Rt, C>> {
let mut acquire_permit = None;

loop {
match self.shared.try_acquire(acquire_permit.take()) {
TryAcquireResult::Acquired(mut conn) => {
match self.shared.on_acquire_async(&mut conn) {
Ok(()) => return Ok(conn.attach(&self.shared)),
Err(e) => {
log::info!("error from before_acquire: {:?}", e);
}
}
}
TryAcquireResult::Connect(permit) => self.shared.connect_async(permit).await,
TryAcquireResult::Wait => {}
TryAcquireResult::PoolClosed => Err(todo!("crate::Error::PoolClosed")),
}
}
}
}
Loading

0 comments on commit 95d39be

Please sign in to comment.