Skip to content

Commit

Permalink
Add regex parsing bound_address to URL
Browse files Browse the repository at this point in the history
* Style changes per code review
  • Loading branch information
srleyva committed Sep 24, 2020
1 parent 214ef0a commit 50f8084
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 17 deletions.
2 changes: 2 additions & 0 deletions elasticsearch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ bytes = "^0.5"
dyn-clone = "~1"
percent-encoding = "2.1.0"
reqwest = { version = "~0.10", default-features = false, features = ["gzip", "json"] }
lazy_static = "^1.4"
url = "^2.1"
regex = "1.3"
serde = { version = "~1", features = ["derive"] }
serde_json = "~1"
serde_with = "~1"
Expand Down
84 changes: 67 additions & 17 deletions elasticsearch/src/http/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::{
};
use base64::write::EncoderWriter as Base64Encoder;
use bytes::BytesMut;
use regex::Regex;
use serde::Serialize;
use serde_json::Value;
use std::{
Expand Down Expand Up @@ -101,6 +102,10 @@ impl fmt::Display for BuildError {

/// Default address to Elasticsearch running on `http://localhost:9200`
pub static DEFAULT_ADDRESS: &str = "http://localhost:9200";
lazy_static! {
static ref ADDRESS_REGEX: Regex =
Regex::new(r"((?P<fqdn>[^/]+)/)?(?P<ip>[^:]+|\[[\da-fA-F:\.]+\]):(?P<port>\d+)$").unwrap();
}

/// Builds a HTTP transport to make API calls to Elasticsearch
pub struct TransportBuilder {
Expand Down Expand Up @@ -389,7 +394,7 @@ impl Transport {
Ok(transport)
}

pub fn request_builder<B, Q>(
fn request_builder<B, Q>(
&self,
connection: &Connection,
method: Method,
Expand Down Expand Up @@ -464,6 +469,28 @@ impl Transport {
Ok(request_builder)
}

fn parse_to_url(address: &str, scheme: &str) -> Result<Url, Error> {
if address.is_empty() {
return Err(crate::error::lib("Bound Address is empty"));
}

let matches = ADDRESS_REGEX
.captures(address)
.ok_or_else(|| crate::lib(format!("error parsing address into url: {}", address)))?;

let host = matches
.name("fqdn")
.or_else(|| Some(matches.name("ip").unwrap()))
.unwrap()
.as_str()
.trim();
let port = matches.name("port").unwrap().as_str().trim();

Ok(Url::parse(
format!("{}://{}:{}", scheme, host, port).as_str(),
)?)
}

/// Creates an asynchronous request that can be awaited
pub async fn send<B, Q>(
&self,
Expand All @@ -486,7 +513,7 @@ impl Transport {
let node_request = self.request_builder(
&connection,
Method::Get,
"_nodes/_all/http",
"_nodes/http?filter_path=nodes.*.http",
headers.clone(),
None::<&Q>,
None::<B>,
Expand All @@ -499,12 +526,17 @@ impl Transport {
.unwrap()
.iter()
.map(|h| {
let url = format!(
"{}://{}",
scheme,
h.1["http"]["publish_address"].as_str().unwrap()
);
let url = Url::parse(&url).unwrap();
let address = h.1["http"]["publish_address"]
.as_str()
.or_else(|| {
Some(
h.1["http"]["bound_address"].as_array().unwrap()[0]
.as_str()
.unwrap(),
)
})
.unwrap();
let url = Self::parse_to_url(address, scheme).unwrap();
Connection::new(url)
})
.collect();
Expand Down Expand Up @@ -708,10 +740,10 @@ impl ConnectionPool for CloudConnectionPool {

/// A Connection Pool that manages a static connection of nodes
#[derive(Debug, Clone)]
pub struct MultiNodeConnectionPool<LoadBalancing = RoundRobin> {
pub struct MultiNodeConnectionPool<ConnSelector = RoundRobin> {
inner: Arc<RwLock<MultiNodeConnectionPoolInner>>,
reseed_frequency: Option<Duration>,
load_balancing_strategy: LoadBalancing,
load_balancing_strategy: ConnSelector,
reseeding: Arc<AtomicBool>,
}

Expand All @@ -721,9 +753,9 @@ pub struct MultiNodeConnectionPoolInner {
connections: Vec<Connection>,
}

impl<LoadBalancing> ConnectionPool for MultiNodeConnectionPool<LoadBalancing>
impl<ConnSelector> ConnectionPool for MultiNodeConnectionPool<ConnSelector>
where
LoadBalancing: LoadBalancingStrategy + Clone,
ConnSelector: ConnectionSelector + Clone,
{
fn next(&self) -> Connection {
let inner = self.inner.read().expect("lock poisoned");
Expand Down Expand Up @@ -787,9 +819,9 @@ impl MultiNodeConnectionPool<RoundRobin> {
}

/** The strategy selects an address from a given collection. */
pub trait LoadBalancingStrategy: Send + Sync + Debug {
pub trait ConnectionSelector: Send + Sync + Debug {
/** Try get the next connection. */
fn try_next<'a>(&self, connections: &'a [Connection]) -> Result<Connection, Error>;
fn try_next(&self, connections: &[Connection]) -> Result<Connection, Error>;
}

/** A round-robin strategy cycles through nodes sequentially. */
Expand All @@ -806,8 +838,8 @@ impl Default for RoundRobin {
}
}

impl LoadBalancingStrategy for RoundRobin {
fn try_next<'a>(&self, connections: &'a [Connection]) -> Result<Connection, Error> {
impl ConnectionSelector for RoundRobin {
fn try_next(&self, connections: &[Connection]) -> Result<Connection, Error> {
if connections.is_empty() {
Err(crate::error::lib("Connection list empty"))
} else {
Expand All @@ -823,7 +855,7 @@ pub mod tests {
use crate::auth::ClientCertificate;
use crate::http::transport::{
CloudId, Connection, ConnectionPool, MultiNodeConnectionPool, SingleNodeConnectionPool,
TransportBuilder,
Transport, TransportBuilder,
};
use std::{
sync::atomic::Ordering,
Expand Down Expand Up @@ -853,6 +885,24 @@ pub mod tests {
assert!(res.is_err());
}

#[test]
fn test_url_parsing_where_hostname_and_ip_present() {
let url = Transport::parse_to_url("localhost/127.0.0.1:9200", "http").unwrap();
assert_eq!(url.into_string(), "http://localhost:9200/");
}

#[test]
fn test_url_parsing_where_only_ip_present() {
let url = Transport::parse_to_url("127.0.0.1:9200", "http").unwrap();
assert_eq!(url.into_string(), "http://127.0.0.1:9200/");
}

#[test]
fn test_url_parsing_where_only_hostname_present() {
let url = Transport::parse_to_url("localhost:9200", "http").unwrap();
assert_eq!(url.into_string(), "http://localhost:9200/");
}

#[test]
fn can_parse_cloud_id() {
let base64 = base64::encode("cloud-endpoint.example$3dadf823f05388497ea684236d918a1a$3f26e1609cf54a0f80137a80de560da4");
Expand Down
3 changes: 3 additions & 0 deletions elasticsearch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,9 @@ type _DoctestReadme = ();
#[macro_use]
extern crate dyn_clone;

#[macro_use]
extern crate lazy_static;

pub mod auth;
pub mod cert;
pub mod http;
Expand Down

0 comments on commit 50f8084

Please sign in to comment.