Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[client] support get progress values from client #147

Open
wants to merge 2 commits into
base: async-await
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ version = "1.6"
optional = true

[dependencies.clickhouse-rs-cityhash-sys]
path = "clickhouse-rs-cityhash-sys"
version = "0.1.2"

[dependencies.log]
Expand Down
25 changes: 14 additions & 11 deletions examples/simple.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{env, error::Error};
use clickhouse_rs::{row, types::Block, Pool};
use futures_util::StreamExt;
use std::{env, error::Error};

async fn execute(database_url: String) -> Result<(), Box<dyn Error>> {
env::set_var("RUST_LOG", "clickhouse_rs=debug");
Expand All @@ -25,16 +25,20 @@ async fn execute(database_url: String) -> Result<(), Box<dyn Error>> {
let mut client = pool.get_handle().await?;
client.execute(ddl).await?;
client.insert("payment", block).await?;
let mut stream = client.query("SELECT * FROM payment").stream();

while let Some(row) = stream.next().await {
let row = row?;
let id: u32 = row.get("customer_id")?;
let amount: u32 = row.get("amount")?;
let name: Option<&str> = row.get("account_name")?;
println!("Found payment {}: {} {:?}", id, amount, name);
{
let mut stream = client.query("SELECT * FROM payment").stream();
while let Some(row) = stream.next().await {
let row = row?;
let id: u32 = row.get("customer_id")?;
let amount: u32 = row.get("amount")?;
let name: Option<&str> = row.get("account_name")?;
println!("Found payment {}: {} {:?}", id, amount, name);
}
}

let progress = client.progress();
println!("Progress results: {:?}", progress);
Ok(())
}

Expand All @@ -49,9 +53,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
#[cfg(all(feature = "tokio_io", feature = "tls"))]
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let database_url = env::var("DATABASE_URL").unwrap_or_else(|_| {
"tcp://localhost:9440?secure=true&skip_verify=true".into()
});
let database_url = env::var("DATABASE_URL")
.unwrap_or_else(|_| "tcp://localhost:9440?secure=true&skip_verify=true".into());
execute(database_url).await
}

Expand Down
7 changes: 7 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ use futures_util::{
future, future::BoxFuture, future::FutureExt, stream, stream::BoxStream, StreamExt,
};
use log::{info, warn};
use types::Progress;

use crate::{
connecting_stream::ConnectingStream,
Expand Down Expand Up @@ -238,6 +239,7 @@ pub struct ClientHandle {
inner: Option<ClickhouseTransport>,
context: Context,
pool: PoolBinding,
progress: Progress,
}

impl fmt::Debug for ClientHandle {
Expand Down Expand Up @@ -288,6 +290,7 @@ impl Client {
None => PoolBinding::None,
Some(p) => PoolBinding::Detached(p),
},
progress: Progress::default(),
};

handle.hello().await?;
Expand Down Expand Up @@ -553,6 +556,10 @@ impl ClientHandle {
unreachable!()
}
}

pub fn progress(&self) -> &Progress {
&self.progress
}
}

fn column_name_to_string(name: &str) -> Result<String> {
Expand Down
23 changes: 13 additions & 10 deletions src/pool/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
use std::{
fmt, mem, pin::Pin,
sync::Arc,
fmt, mem,
pin::Pin,
sync::atomic::{self, Ordering},
sync::Arc,
task::{Context, Poll, Waker},
};

use futures_util::future::BoxFuture;
use log::error;

use crate::{
Client,
ClientHandle,
errors::Result, types::{IntoOptions, OptionsSource},
errors::Result,
types::{IntoOptions, OptionsSource, Progress},
Client, ClientHandle,
};

pub use self::futures::GetHandle;
Expand Down Expand Up @@ -215,16 +216,16 @@ impl Pool {
match new.poll_unpin(cx) {
Poll::Ready(Ok(client)) => {
self.inner.idle.push(client).unwrap();
},
}
Poll::Pending => {
// NOTE: it is okay to drop the construction task
// because another construction will be attempted
// later in Pool::poll
let _ = self.inner.new.push(new);
},
}
Poll::Ready(Err(err)) => {
return Err(err);
},
}
}
}

Expand Down Expand Up @@ -277,6 +278,7 @@ impl Drop for ClientHandle {
let client = Self {
inner: Some(inner),
pool: pool.clone(),
progress: Progress::default(),
context,
};
pool.return_conn(client);
Expand All @@ -294,7 +296,7 @@ mod test {

use futures_util::future;

use crate::{Block, errors::Result, Options, test_misc::DATABASE_URL};
use crate::{errors::Result, test_misc::DATABASE_URL, Block, Options};

use super::Pool;
use url::Url;
Expand Down Expand Up @@ -404,7 +406,8 @@ mod test {

#[test]
fn test_get_addr() {
let options = Options::from_str("tcp://host1:9000?alt_hosts=host2:9000,host3:9000").unwrap();
let options =
Options::from_str("tcp://host1:9000?alt_hosts=host2:9000,host3:9000").unwrap();
let pool = Pool::new(options);

assert_eq!(pool.get_addr(), &Url::from_str("tcp://host1:9000").unwrap());
Expand Down
28 changes: 22 additions & 6 deletions src/types/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{borrow::Cow, collections::HashMap, fmt, mem, pin::Pin, sync::Mutex, str::FromStr};
use std::{borrow::Cow, collections::HashMap, fmt, mem, pin::Pin, str::FromStr, sync::Mutex};

use chrono::prelude::*;
use chrono_tz::Tz;
Expand All @@ -10,15 +10,15 @@ use crate::errors::ServerError;

pub use self::{
block::{Block, RCons, RNil, Row, RowBuilder, Rows},
column::{Column, ColumnType, Simple, Complex},
column::{Column, ColumnType, Complex, Simple},
decimal::Decimal,
enums::{Enum16, Enum8},
from_sql::{FromSql, FromSqlResult},
value_ref::ValueRef,
options::Options,
query::Query,
query_result::QueryResult,
value::Value,
value_ref::ValueRef,
};

pub(crate) use self::{
Expand Down Expand Up @@ -51,12 +51,26 @@ mod enums;
mod options;

#[derive(Copy, Clone, Debug, Default, PartialEq)]
pub(crate) struct Progress {
pub struct Progress {
pub rows: u64,
pub bytes: u64,
pub total_rows: u64,
}

impl Progress {
pub(crate) fn update(&mut self, progress: Progress) {
self.rows += progress.rows;
self.bytes += progress.bytes;
self.total_rows += progress.total_rows;
}

pub(crate) fn reset(&mut self) {
self.rows = 0;
self.bytes = 0;
self.total_rows = 0;
}
}

#[derive(Copy, Clone, Default, Debug, PartialEq)]
pub(crate) struct ProfileInfo {
pub rows: u64,
Expand Down Expand Up @@ -200,7 +214,7 @@ has_sql_type! {
pub enum DateTimeType {
DateTime32,
DateTime64(u32, Tz),
Chrono
Chrono,
}

#[derive(Debug, Copy, Clone, PartialOrd, Eq, PartialEq, Hash)]
Expand Down Expand Up @@ -351,7 +365,9 @@ impl SqlType {
SqlType::Float32 => "Float32".into(),
SqlType::Float64 => "Float64".into(),
SqlType::Date => "Date".into(),
SqlType::DateTime(DateTimeType::DateTime64(precision, tz)) => format!("DateTime64({}, '{:?}')", precision, tz).into(),
SqlType::DateTime(DateTimeType::DateTime64(precision, tz)) => {
format!("DateTime64({}, '{:?}')", precision, tz).into()
}
SqlType::DateTime(_) => "DateTime".into(),
SqlType::Ipv4 => "IPv4".into(),
SqlType::Ipv6 => "IPv6".into(),
Expand Down
5 changes: 3 additions & 2 deletions src/types/query_result/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use std::{marker::PhantomData, sync::Arc};
use futures_util::stream::BoxStream;
use futures_util::{
future,
stream::{self, StreamExt},
TryStreamExt,
};
use log::info;
use std::{marker::PhantomData, sync::Arc};

use crate::{
try_opt,
errors::Result,
try_opt,
types::{
block::BlockRef, query_result::stream_blocks::BlockStream, Block, Cmd, Complex, Query, Row,
Rows, Simple,
Expand Down Expand Up @@ -93,6 +93,7 @@ impl<'a> QueryResult<'a> {

let inner = c.inner.take().unwrap().call(Cmd::SendQuery(query, context));

c.progress.reset();
BlockStream::<'a>::new(c, inner, skip_first_block)
})
}
Expand Down
13 changes: 10 additions & 3 deletions src/types/query_result/stream_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ impl<'a> Drop for BlockStream<'a> {
}

impl<'a> BlockStream<'a> {
pub(crate) fn new(client: &mut ClientHandle, inner: PacketStream, skip_first_block: bool) -> BlockStream {
pub(crate) fn new(
client: &mut ClientHandle,
inner: PacketStream,
skip_first_block: bool,
) -> BlockStream {
BlockStream {
client,
inner,
Expand Down Expand Up @@ -75,10 +79,13 @@ impl<'a> Stream for BlockStream<'a> {
}
self.eof = true;
}
Packet::ProfileInfo(_) | Packet::Progress(_) => {}
Packet::ProfileInfo(_) => {}
Packet::Exception(exception) => {
self.eof = true;
return Poll::Ready(Some(Err(Error::Server(exception))))
return Poll::Ready(Some(Err(Error::Server(exception))));
}
Packet::Progress(progress) => {
self.client.progress.update(progress);
}
Packet::Block(block) => {
self.block_index += 1;
Expand Down