diff --git a/bigtable_rs/src/bigtable.rs b/bigtable_rs/src/bigtable.rs index fb98ad9..5c23f08 100644 --- a/bigtable_rs/src/bigtable.rs +++ b/bigtable_rs/src/bigtable.rs @@ -215,6 +215,43 @@ impl BigTableConnection { is_read_only: bool, channel_size: usize, timeout: Option, + ) -> Result { + let authentication_manager = AuthenticationManager::new().await?; + Self::new_with_auth_manager( + project_id, + instance_name, + is_read_only, + channel_size, + timeout, + authentication_manager, + ) + .await + } + /// Establish a connection to the BigTable instance named `instance_name`. If read-only access + /// is required, the `read_only` flag should be used to reduce the requested OAuth2 scope. + /// + /// The `authentication_manager` variable will be used to determine the + /// program name that contains the BigTable instance in addition to access credentials. + /// + /// + /// `channel_size` defines the number of connections (or channels) established to Bigtable + /// service, and the requests are load balanced onto all the channels. You must therefore + /// make sure all of these connections are open when a new request is to be sent. + /// Idle connections are automatically closed in "a few minutes". Therefore it is important to + /// make sure you have a high enough QPS to send at least one request through all the + /// connections (in every service host) every minute. If not, you should consider decreasing the + /// channel size. If you are not sure what value to pick and your load is low, just start with 1. + /// The recommended value could be 2 x the thread count in your tokio environment see info here + /// https://docs.rs/tokio/latest/tokio/attr.main.html, but it might be a very different case for + /// different applications. + /// + pub async fn new_with_auth_manager( + project_id: &str, + instance_name: &str, + is_read_only: bool, + channel_size: usize, + timeout: Option, + authentication_manager: AuthenticationManager, ) -> Result { match std::env::var("BIGTABLE_EMULATOR_HOST") { Ok(endpoint) => { @@ -247,8 +284,6 @@ impl BigTableConnection { } Err(_) => { - let authentication_manager = AuthenticationManager::new().await?; - let table_prefix = format!( "projects/{}/instances/{}/tables/", project_id, instance_name diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 0fb0243..e6dead4 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -21,6 +21,10 @@ path = "src/simple_write.rs" name = "http_server" path = "src/http_server/http_server.rs" +[[bin]] +name = "custom_path_connection" +path = "src/custom_path_connection.rs" + [[bin]] name = "prefix" path = "src/prefix.rs" @@ -33,3 +37,4 @@ log = "0.4.20" warp = "0.3.6" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" +gcp_auth = "0.9.0" diff --git a/examples/src/custom_path_connection.rs b/examples/src/custom_path_connection.rs new file mode 100644 index 0000000..55f157b --- /dev/null +++ b/examples/src/custom_path_connection.rs @@ -0,0 +1,88 @@ +use bigtable_rs::bigtable; +use bigtable_rs::google::bigtable::v2::mutation; +use bigtable_rs::google::bigtable::v2::mutation::SetCell; +use bigtable_rs::google::bigtable::v2::row_filter::Filter; +use bigtable_rs::google::bigtable::v2::{ + MutateRowRequest, Mutation, ReadRowsRequest, RowFilter, RowSet, +}; +use env_logger; +use gcp_auth::{AuthenticationManager, CustomServiceAccount}; +use std::error::Error; +use std::time::Duration; + +#[tokio::main] +async fn main() -> Result<(), Box> { + env_logger::init(); + + let project_id = "project-1"; + let instance_name = "instance-1"; + let table_name = "table-1"; + let channel_size = 4; + let timeout = Duration::from_secs(10); + + let key: String = "key3".to_owned(); + + let json_path: &str = "path/to/json"; + // make a bigtable client + let connection = bigtable::BigTableConnection::new_with_auth_manager( + project_id, + instance_name, + false, + channel_size, + Some(timeout), + AuthenticationManager::from(CustomServiceAccount::from_file(json_path).unwrap()), + ) + .await?; + let mut bigtable = connection.client(); + + let request = MutateRowRequest { + table_name: bigtable.get_full_table_name(table_name), + row_key: key.clone().into_bytes(), + mutations: vec![Mutation { + mutation: Some(mutation::Mutation::SetCell(SetCell { + family_name: "cf1".to_owned(), + column_qualifier: "c1".to_owned().into_bytes(), + timestamp_micros: -1, // IMPORTANT: Don't leave it empty. Use -1 for current Bigtable server time. + value: "a new write value".to_owned().into_bytes(), + })), + }], + ..MutateRowRequest::default() + }; + + // write to table + let _response = bigtable.mutate_row(request).await?; + + // read from table again + // prepare a ReadRowsRequest + let request = ReadRowsRequest { + table_name: bigtable.get_full_table_name(table_name), + rows_limit: 10, + rows: Some(RowSet { + row_keys: vec![key.clone().into_bytes()], + row_ranges: vec![], + }), + filter: Some(RowFilter { + filter: Some(Filter::CellsPerColumnLimitFilter(1)), + }), + ..ReadRowsRequest::default() + }; + + // calling bigtable API to get results + let response = bigtable.read_rows(request).await?; + + // simply print results for example usage + response.into_iter().for_each(|(key, data)| { + println!("------------\n{}", String::from_utf8(key.clone()).unwrap()); + data.into_iter().for_each(|row_cell| { + println!( + " [{}:{}] \"{}\" @ {}", + row_cell.family_name, + String::from_utf8(row_cell.qualifier).unwrap(), + String::from_utf8(row_cell.value).unwrap(), + row_cell.timestamp_micros + ) + }) + }); + + Ok(()) +}