Skip to content

Commit

Permalink
Begin implementing the sniffing of nodes
Browse files Browse the repository at this point in the history
WIP
  • Loading branch information
srleyva committed Sep 21, 2020
1 parent 2705663 commit 4de791d
Showing 1 changed file with 18 additions and 3 deletions.
21 changes: 18 additions & 3 deletions elasticsearch/src/http/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::{
response::Response,
Method,
},
nodes::{NodesInfo, NodesInfoParts},
};
use base64::write::EncoderWriter as Base64Encoder;
use bytes::BytesMut;
Expand Down Expand Up @@ -380,8 +381,11 @@ impl Transport {
Q: Serialize + ?Sized,
{
if self.conn_pool.reseedable() {
// Reseed nodes
println!("Reseeding!");
self.conn_pool.reseeding();
// NodesInfo::new(&self, NodesInfoParts::None)
// .send()
// .await
// .expect("Could not retrieve nodes for refresh");
}
let connection = self.conn_pool.next();
let url = connection.url.join(path.trim_start_matches('/'))?;
Expand Down Expand Up @@ -471,6 +475,9 @@ pub trait ConnectionPool: Debug + dyn_clone::DynClone + Sync + Send {
false
}

// NOOP
fn reseeding(&self) {}

// NOOP by default
fn reseed(&self, _connection: Vec<Connection>) {}
}
Expand Down Expand Up @@ -637,6 +644,7 @@ pub struct MultiNodeConnectionPool<TStrategy = RoundRobin> {

#[derive(Debug, Clone)]
pub struct MultiNodeConnectionPoolInner {
reseeding: bool,
last_update: Option<Instant>,
connections: Vec<Connection>,
}
Expand All @@ -660,14 +668,20 @@ where
.last_update
.as_ref()
.map(|last_update| last_update.elapsed() > wait);
last_update_is_stale.unwrap_or(true)
last_update_is_stale.unwrap_or(true) && !inner.reseeding
}

fn reseeding(&self) {
let mut inner = self.inner.write().expect("Lock Poisoned");
inner.reseeding = true
}

fn reseed(&self, mut connection: Vec<Connection>) {
let mut inner = self.inner.write().expect("lock poisoned");
inner.last_update = Some(Instant::now());
inner.connections.clear();
inner.connections.append(&mut connection);
inner.reseeding = false;
}
}

Expand All @@ -678,6 +692,7 @@ impl MultiNodeConnectionPool<RoundRobin> {

let inner: Arc<RwLock<MultiNodeConnectionPoolInner>> =
Arc::new(RwLock::new(MultiNodeConnectionPoolInner {
reseeding: false,
last_update: None,
connections,
}));
Expand Down

0 comments on commit 4de791d

Please sign in to comment.