Skip to content

Commit

Permalink
Add a new optional parameter in BigTableConnection constructor for re…
Browse files Browse the repository at this point in the history
…ading Service Worker JSON. (#63)

* Add new BigTableConnection constructor for reading Service Worker json

* Created new function to allow file path

* Remove simulation from new_with_path function

* Remove whitespace

* Refactor to a single reusable function

* Remove unwanted use crate

* Add a short description to the function

* Add an example on how to use  and shift of emulator logic to new_with_auth_manager

* Remove unwanted use crate

* format code using cargo fmt
  • Loading branch information
kiv1 committed Mar 19, 2024
1 parent bbe3606 commit e8f351c
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 2 deletions.
39 changes: 37 additions & 2 deletions bigtable_rs/src/bigtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,43 @@ impl BigTableConnection {
is_read_only: bool,
channel_size: usize,
timeout: Option<Duration>,
) -> Result<Self> {
let authentication_manager = AuthenticationManager::new().await?;
Self::new_with_auth_manager(
project_id,
instance_name,
is_read_only,
channel_size,
timeout,
authentication_manager,
)
.await
}
/// Establish a connection to the BigTable instance named `instance_name`. If read-only access
/// is required, the `read_only` flag should be used to reduce the requested OAuth2 scope.
///
/// The `authentication_manager` variable will be used to determine the
/// program name that contains the BigTable instance in addition to access credentials.
///
///
/// `channel_size` defines the number of connections (or channels) established to Bigtable
/// service, and the requests are load balanced onto all the channels. You must therefore
/// make sure all of these connections are open when a new request is to be sent.
/// Idle connections are automatically closed in "a few minutes". Therefore it is important to
/// make sure you have a high enough QPS to send at least one request through all the
/// connections (in every service host) every minute. If not, you should consider decreasing the
/// channel size. If you are not sure what value to pick and your load is low, just start with 1.
/// The recommended value could be 2 x the thread count in your tokio environment see info here
/// https://docs.rs/tokio/latest/tokio/attr.main.html, but it might be a very different case for
/// different applications.
///
pub async fn new_with_auth_manager(
project_id: &str,
instance_name: &str,
is_read_only: bool,
channel_size: usize,
timeout: Option<Duration>,
authentication_manager: AuthenticationManager,
) -> Result<Self> {
match std::env::var("BIGTABLE_EMULATOR_HOST") {
Ok(endpoint) => {
Expand Down Expand Up @@ -247,8 +284,6 @@ impl BigTableConnection {
}

Err(_) => {
let authentication_manager = AuthenticationManager::new().await?;

let table_prefix = format!(
"projects/{}/instances/{}/tables/",
project_id, instance_name
Expand Down
5 changes: 5 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ path = "src/simple_write.rs"
name = "http_server"
path = "src/http_server/http_server.rs"

[[bin]]
name = "custom_path_connection"
path = "src/custom_path_connection.rs"

[[bin]]
name = "prefix"
path = "src/prefix.rs"
Expand All @@ -33,3 +37,4 @@ log = "0.4.20"
warp = "0.3.6"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
gcp_auth = "0.9.0"
88 changes: 88 additions & 0 deletions examples/src/custom_path_connection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use bigtable_rs::bigtable;
use bigtable_rs::google::bigtable::v2::mutation;
use bigtable_rs::google::bigtable::v2::mutation::SetCell;
use bigtable_rs::google::bigtable::v2::row_filter::Filter;
use bigtable_rs::google::bigtable::v2::{
MutateRowRequest, Mutation, ReadRowsRequest, RowFilter, RowSet,
};
use env_logger;
use gcp_auth::{AuthenticationManager, CustomServiceAccount};
use std::error::Error;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
env_logger::init();

let project_id = "project-1";
let instance_name = "instance-1";
let table_name = "table-1";
let channel_size = 4;
let timeout = Duration::from_secs(10);

let key: String = "key3".to_owned();

let json_path: &str = "path/to/json";
// make a bigtable client
let connection = bigtable::BigTableConnection::new_with_auth_manager(
project_id,
instance_name,
false,
channel_size,
Some(timeout),
AuthenticationManager::from(CustomServiceAccount::from_file(json_path).unwrap()),
)
.await?;
let mut bigtable = connection.client();

let request = MutateRowRequest {
table_name: bigtable.get_full_table_name(table_name),
row_key: key.clone().into_bytes(),
mutations: vec![Mutation {
mutation: Some(mutation::Mutation::SetCell(SetCell {
family_name: "cf1".to_owned(),
column_qualifier: "c1".to_owned().into_bytes(),
timestamp_micros: -1, // IMPORTANT: Don't leave it empty. Use -1 for current Bigtable server time.
value: "a new write value".to_owned().into_bytes(),
})),
}],
..MutateRowRequest::default()
};

// write to table
let _response = bigtable.mutate_row(request).await?;

// read from table again
// prepare a ReadRowsRequest
let request = ReadRowsRequest {
table_name: bigtable.get_full_table_name(table_name),
rows_limit: 10,
rows: Some(RowSet {
row_keys: vec![key.clone().into_bytes()],
row_ranges: vec![],
}),
filter: Some(RowFilter {
filter: Some(Filter::CellsPerColumnLimitFilter(1)),
}),
..ReadRowsRequest::default()
};

// calling bigtable API to get results
let response = bigtable.read_rows(request).await?;

// simply print results for example usage
response.into_iter().for_each(|(key, data)| {
println!("------------\n{}", String::from_utf8(key.clone()).unwrap());
data.into_iter().for_each(|row_cell| {
println!(
" [{}:{}] \"{}\" @ {}",
row_cell.family_name,
String::from_utf8(row_cell.qualifier).unwrap(),
String::from_utf8(row_cell.value).unwrap(),
row_cell.timestamp_micros
)
})
});

Ok(())
}

0 comments on commit e8f351c

Please sign in to comment.