Skip to content

Commit

Permalink
Begin implementing the sniffing of nodes
Browse files Browse the repository at this point in the history
* make review edits
  • Loading branch information
srleyva committed Sep 23, 2020
1 parent 2705663 commit fedd490
Showing 1 changed file with 103 additions and 24 deletions.
127 changes: 103 additions & 24 deletions elasticsearch/src/http/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use crate::{
use base64::write::EncoderWriter as Base64Encoder;
use bytes::BytesMut;
use serde::Serialize;
use serde_json::Value;
use std::{
error, fmt,
fmt::Debug,
Expand Down Expand Up @@ -288,7 +289,7 @@ impl Default for TransportBuilder {
/// A connection to an Elasticsearch node, used to send an API request
#[derive(Debug, Clone)]
pub struct Connection {
url: Url,
url: Arc<Url>,
}

impl Connection {
Expand All @@ -303,8 +304,14 @@ impl Connection {
url
};

let url = Arc::new(url);

Self { url }
}

pub fn url(&self) -> Arc<Url> {
self.url.clone()
}
}

/// A HTTP transport responsible for making the API requests to Elasticsearch,
Expand Down Expand Up @@ -365,27 +372,22 @@ impl Transport {
Ok(transport)
}

/// Creates an asynchronous request that can be awaited
pub async fn send<B, Q>(
pub fn request_builder<B, Q>(
&self,
connection: &Connection,
method: Method,
path: &str,
headers: HeaderMap,
query_string: Option<&Q>,
body: Option<B>,
timeout: Option<Duration>,
) -> Result<Response, Error>
) -> Result<reqwest::RequestBuilder, Error>
where
B: Body,
Q: Serialize + ?Sized,
{
if self.conn_pool.reseedable() {
// Reseed nodes
println!("Reseeding!");
}
let connection = self.conn_pool.next();
let url = connection.url.join(path.trim_start_matches('/'))?;
let reqwest_method = self.method(method);
let url = connection.url.join(path.trim_start_matches('/'))?;
let mut request_builder = self.client.request(reqwest_method, url);

if let Some(t) = timeout {
Expand Down Expand Up @@ -442,6 +444,70 @@ impl Transport {
if let Some(q) = query_string {
request_builder = request_builder.query(q);
}
Ok(request_builder)
}

/// Creates an asynchronous request that can be awaited
pub async fn send<B, Q>(
&self,
method: Method,
path: &str,
headers: HeaderMap,
query_string: Option<&Q>,
body: Option<B>,
timeout: Option<Duration>,
) -> Result<Response, Error>
where
B: Body,
Q: Serialize + ?Sized,
{
// Threads will execute against old connection pool during reseed
if self.conn_pool.reseedable() {
// Set as reseeding prevents another thread from attempting
// to reseed during es request and reseed
self.conn_pool.reseeding();

let connection = self.conn_pool.next();
let scheme = &connection.url.scheme();
// Build node info request
let node_request = self.request_builder(
&connection,
Method::Get,
"_nodes/_all/http",
headers.clone(),
None::<&Q>,
None::<B>,
timeout,
)?;
let resp = node_request.send().await?;
let json: Value = resp.json().await?;
let connections: Vec<Connection> = json["nodes"]
.as_object()
.unwrap()
.iter()
.map(|h| {
let url = format!(
"{}://{}",
scheme,
h.1["http"]["publish_address"].as_str().unwrap()
);
let url = Url::parse(&url).unwrap();
Connection::new(url)
})
.collect();
self.conn_pool.reseed(connections);
}

let connection = self.conn_pool.next();
let request_builder = self.request_builder(
&connection,
method,
path,
headers,
query_string,
body,
timeout,
)?;

let response = request_builder.send().await;
match response {
Expand Down Expand Up @@ -471,6 +537,9 @@ pub trait ConnectionPool: Debug + dyn_clone::DynClone + Sync + Send {
false
}

// NOOP
fn reseeding(&self) {}

// NOOP by default
fn reseed(&self, _connection: Vec<Connection>) {}
}
Expand Down Expand Up @@ -629,70 +698,80 @@ impl ConnectionPool for CloudConnectionPool {

/// A Connection Pool that manages a static connection of nodes
#[derive(Debug, Clone)]
pub struct MultiNodeConnectionPool<TStrategy = RoundRobin> {
pub struct MultiNodeConnectionPool<LoadBalancingStrategy = RoundRobin> {
inner: Arc<RwLock<MultiNodeConnectionPoolInner>>,
wait: Option<Duration>,
strategy: TStrategy,
reseed_frequency: Option<Duration>,
load_balancing_strategy: LoadBalancingStrategy,
}

#[derive(Debug, Clone)]
pub struct MultiNodeConnectionPoolInner {
reseeding: bool,
last_update: Option<Instant>,
connections: Vec<Connection>,
}

impl<TStrategy> ConnectionPool for MultiNodeConnectionPool<TStrategy>
where
TStrategy: Strategy + Clone,
TStrategy: LoadBalancingStrategy + Clone,
{
fn next(&self) -> Connection {
let inner = self.inner.read().expect("lock poisoned");
self.strategy.try_next(&inner.connections).unwrap()
self.load_balancing_strategy
.try_next(&inner.connections)
.unwrap()
}

fn reseedable(&self) -> bool {
let inner = self.inner.read().expect("lock poisoned");
let wait = match self.wait {
let reseed_frequency = match self.reseed_frequency {
Some(wait) => wait,
None => return false,
};
let last_update_is_stale = inner
.last_update
.as_ref()
.map(|last_update| last_update.elapsed() > wait);
last_update_is_stale.unwrap_or(true)
.map(|last_update| last_update.elapsed() > reseed_frequency);
last_update_is_stale.unwrap_or(true) && !inner.reseeding
}

fn reseeding(&self) {
let mut inner = self.inner.write().expect("Lock Poisoned");
inner.reseeding = true
}

fn reseed(&self, mut connection: Vec<Connection>) {
let mut inner = self.inner.write().expect("lock poisoned");
inner.last_update = Some(Instant::now());
inner.connections.clear();
inner.connections.append(&mut connection);
inner.reseeding = false;
}
}

impl MultiNodeConnectionPool<RoundRobin> {
/** Use a round-robin strategy for balancing traffic over the given set of nodes. */
pub fn round_robin(urls: Vec<Url>, wait: Option<Duration>) -> Self {
pub fn round_robin(urls: Vec<Url>, reseed_frequency: Option<Duration>) -> Self {
let connections = urls.into_iter().map(Connection::new).collect();

let inner: Arc<RwLock<MultiNodeConnectionPoolInner>> =
Arc::new(RwLock::new(MultiNodeConnectionPoolInner {
reseeding: false,
last_update: None,
connections,
}));

let strategy = RoundRobin::default();
let load_balancing_strategy = RoundRobin::default();
Self {
inner,
strategy,
wait,
load_balancing_strategy,
reseed_frequency,
}
}
}

/** The strategy selects an address from a given collection. */
pub trait Strategy: Send + Sync + Debug {
pub trait LoadBalancingStrategy: Send + Sync + Debug {
/** Try get the next connection. */
fn try_next<'a>(&self, connections: &'a [Connection]) -> Result<Connection, Error>;
}
Expand All @@ -711,7 +790,7 @@ impl Default for RoundRobin {
}
}

impl Strategy for RoundRobin {
impl LoadBalancingStrategy for RoundRobin {
fn try_next<'a>(&self, connections: &'a [Connection]) -> Result<Connection, Error> {
if connections.is_empty() {
Err(crate::error::lib("Connection list empty"))
Expand Down

0 comments on commit fedd490

Please sign in to comment.