Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support connecting to emulator over unix domain socket #72

Merged
merged 2 commits into from
Apr 14, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 54 additions & 27 deletions bigtable_rs/src/bigtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ use std::time::Duration;
use gcp_auth::AuthenticationManager;
use log::info;
use thiserror::Error;
use tokio::net::UnixStream;
use tonic::transport::Endpoint;
use tonic::{codec::Streaming, transport::Channel, transport::ClientTlsConfig, Response};
use tower::ServiceBuilder;
Expand Down Expand Up @@ -223,7 +224,6 @@ impl BigTableConnection {
project_id,
instance_name,
is_read_only,
channel_size,
timeout,
),

Expand Down Expand Up @@ -272,7 +272,6 @@ impl BigTableConnection {
project_id,
instance_name,
is_read_only,
channel_size,
timeout,
),

Expand Down Expand Up @@ -314,44 +313,71 @@ impl BigTableConnection {
})
.collect();

// construct a channel, by balancing over all endpoints.
let channel = Channel::balance_list(endpoints.into_iter());

let auth_manager = Some(Arc::new(authentication_manager));
Ok(Self {
client: create_client(endpoints, auth_manager, is_read_only),
client: create_client(channel, auth_manager, is_read_only),
table_prefix: Arc::new(table_prefix),
timeout: Arc::new(timeout),
})
}
}
}

fn new_with_emulator(
/// Establish a connection to a BigTable emulator at [emulator_endpoint].
/// This is usually covered by [Self::new] or [Self::new_with_auth_manager],
/// which both support the `BIGTABLE_EMULATOR_HOST` env variable. However,
/// this function can also be used directly, in case setting
/// `BIGTABLE_EMULATOR_HOST` is inconvenient.
pub fn new_with_emulator(
flokli marked this conversation as resolved.
Show resolved Hide resolved
emulator_endpoint: &str,
project_id: &str,
instance_name: &str,
is_read_only: bool,
channel_size: usize,
timeout: Option<Duration>,
) -> Result<Self> {
info!("Connecting to bigtable emulator at {}", emulator_endpoint);
let endpoints: Vec<Endpoint> = vec![0; channel_size.max(1)]
.iter()
.map(move |_| {
Channel::from_shared(format!("http://{}", emulator_endpoint))
.expect("Invalid connection emulator uri")
.http2_keep_alive_interval(Duration::from_secs(60))
.keep_alive_while_idle(true)
})
.map(|ep| {
if let Some(timeout) = timeout {
ep.timeout(timeout)
} else {
ep
}
})
.collect();

// configures the endpoint with the specified parameters
fn configure_endpoint(endpoint: Endpoint, timeout: Option<Duration>) -> Endpoint {
let endpoint = endpoint
.http2_keep_alive_interval(Duration::from_secs(60))
.keep_alive_while_idle(true);

if let Some(timeout) = timeout {
endpoint.timeout(timeout)
} else {
endpoint
}
}

// Parse emulator_endpoint. Officially, it's only host:port,
// but unix:///path/to/unix.sock also works in the Go SDK at least.
// Having the emulator listen on unix domain sockets without ip2unix is
// covered in https://github.com/googleapis/google-cloud-go/pull/9665.
let channel = if let Some(path) = emulator_endpoint.strip_prefix("unix://") {
// the URL doesn't matter, we use a custom connector.
let endpoint = Endpoint::from_static("http://[::]:50051");
let endpoint = configure_endpoint(endpoint, timeout);

let path: String = path.to_string();
let connector = tower::service_fn({
move |_: tonic::transport::Uri| UnixStream::connect(path.clone())
});

endpoint.connect_with_connector_lazy(connector)
flokli marked this conversation as resolved.
Show resolved Hide resolved
} else {
let endpoint = Channel::from_shared(format!("http://{}", emulator_endpoint))
.expect("invalid connection emulator uri");
let endpoint = configure_endpoint(endpoint, timeout);

endpoint.connect_lazy()
flokli marked this conversation as resolved.
Show resolved Hide resolved
};

Ok(Self {
client: create_client(endpoints, None, is_read_only),
client: create_client(channel, None, is_read_only),
table_prefix: Arc::new(format!(
"projects/{}/instances/{}/tables/",
project_id, instance_name
Expand All @@ -374,19 +400,20 @@ impl BigTableConnection {
}

/// Helper function to create a BigtableClient<AuthSvc>
/// from a channel.
fn create_client(
endpoints: Vec<Endpoint>,
channel: Channel,
authentication_manager: Option<Arc<AuthenticationManager>>,
read_only: bool,
) -> BigtableClient<AuthSvc> {
let scopes = if read_only {
"https://www.googleapis.com/auth/bigtable.data.readonly".to_string()
"https://www.googleapis.com/auth/bigtable.data.readonly"
} else {
"https://www.googleapis.com/auth/bigtable.data".to_string()
"https://www.googleapis.com/auth/bigtable.data"
};
let channel = Channel::balance_list(endpoints.into_iter());

let auth_svc = ServiceBuilder::new()
.layer_fn(|c| AuthSvc::new(c, authentication_manager.clone(), scopes.clone()))
.layer_fn(|c| AuthSvc::new(c, authentication_manager.clone(), scopes.to_string()))
.service(channel);
return BigtableClient::new(auth_svc);
}
Expand Down
Loading