diff --git a/pingora-load-balancing/src/background.rs b/pingora-load-balancing/src/background.rs index 5a2eebb0..9ae7882f 100644 --- a/pingora-load-balancing/src/background.rs +++ b/pingora-load-balancing/src/background.rs @@ -14,7 +14,10 @@ //! Implement [BackgroundService] for [LoadBalancer] -use std::{hash::Hash, time::{Duration, Instant}}; +use std::{ + hash::Hash, + time::{Duration, Instant}, +}; use super::{BackendIter, BackendSelection, LoadBalancer}; use async_trait::async_trait; diff --git a/pingora-load-balancing/src/discovery.rs b/pingora-load-balancing/src/discovery.rs index 91431389..40d1cc22 100644 --- a/pingora-load-balancing/src/discovery.rs +++ b/pingora-load-balancing/src/discovery.rs @@ -69,7 +69,7 @@ where impl Static where - M: Ord + Clone + M: Ord + Clone, { /// Create a new boxed [Static] service discovery with the given backends. pub fn new(backends: BTreeSet>) -> Box { diff --git a/pingora-load-balancing/src/health_check.rs b/pingora-load-balancing/src/health_check.rs index a902af6a..c7cc0f50 100644 --- a/pingora-load-balancing/src/health_check.rs +++ b/pingora-load-balancing/src/health_check.rs @@ -89,7 +89,6 @@ where } impl TcpHealthCheck { - /// Create a new [TcpHealthCheck] that tries to establish a TLS connection. /// /// The default values are the same as [Self::new()]. diff --git a/pingora-load-balancing/src/lib.rs b/pingora-load-balancing/src/lib.rs index 7a773861..1d80f77b 100644 --- a/pingora-load-balancing/src/lib.rs +++ b/pingora-load-balancing/src/lib.rs @@ -56,14 +56,6 @@ pub struct Backend { pub metadata: M, } -impl Backend<()> { - /// Create a new [Backend] with `weight` 1. The function will try to parse - /// `addr` into a [std::net::SocketAddr]. - pub fn new(addr: &str) -> Result { - Self::new_with_meta(addr, ()) - } -} - impl Backend where M: Hash, @@ -90,20 +82,22 @@ impl Backend { }) // TODO: UDS } -} - -impl std::ops::Deref for Backend { - type Target = SocketAddr; - fn deref(&self) -> &Self::Target { + pub fn addr(&self) -> &SocketAddr { &self.addr } -} -impl std::ops::DerefMut for Backend { - fn deref_mut(&mut self) -> &mut Self::Target { + pub fn addr_mut(&mut self) -> &mut SocketAddr { &mut self.addr } + + pub fn metadata(&self) -> &M { + &self.metadata + } + + pub fn metadata_mut(&mut self) -> &mut M { + &mut self.metadata + } } impl std::net::ToSocketAddrs for Backend { @@ -153,7 +147,11 @@ where M: PartialEq + Hash, { /// Return true when the new is different from the current set of backends - fn do_update(&self, new_backends: BTreeSet>, enablement: HashMap) -> bool { + fn do_update( + &self, + new_backends: BTreeSet>, + enablement: HashMap, + ) -> bool { if (**self.backends.load()) != new_backends { let old_health = self.health.load(); let mut health = HashMap::with_capacity(new_backends.len()); @@ -326,7 +324,7 @@ where /// /// Note: [ToSocketAddrs] will invoke blocking network IO for DNS lookup if /// the input cannot be directly parsed as [SocketAddr]. - pub fn try_from_iter>(iter: T) -> IoResult + pub fn try_from_iter_default_meta>(iter: T) -> IoResult where A: ToSocketAddrs, { @@ -426,7 +424,6 @@ where S: BackendSelection + 'static, S::Iter: BackendIter, { - /// Set the health check method. See [health_check]. pub fn set_health_check( &mut self, @@ -451,7 +448,7 @@ mod test { #[tokio::test] async fn test_static_backends() { let backends: LoadBalancer, _> = - LoadBalancer::try_from_iter(["1.1.1.1:80", "1.0.0.1:80"]).unwrap(); + LoadBalancer::try_from_iter_default_meta(["1.1.1.1:80", "1.0.0.1:80"]).unwrap(); let backend1 = Backend::new_with_meta("1.1.1.1:80", u32::default()).unwrap(); let backend2 = Backend::new_with_meta("1.0.0.1:80", u32::default()).unwrap(); @@ -463,11 +460,11 @@ mod test { #[tokio::test] async fn test_backends() { let discovery = discovery::Static::default(); - let good1 = Backend::new("1.1.1.1:80").unwrap(); + let good1 = Backend::new_with_meta("1.1.1.1:80", 101u32).unwrap(); discovery.add(good1.clone()); - let good2 = Backend::new("1.0.0.1:80").unwrap(); + let good2 = Backend::new_with_meta("1.0.0.1:80", 102u32).unwrap(); discovery.add(good2.clone()); - let bad = Backend::new("127.0.0.1:79").unwrap(); + let bad = Backend::new_with_meta("127.0.0.1:79", 404u32).unwrap(); discovery.add(bad.clone()); let mut backends = Backends::new(Box::new(discovery)); @@ -501,7 +498,9 @@ mod test { impl ServiceDiscovery for TestDiscovery { type Metadata = TestMetadata; - async fn discover(&self) -> Result<(BTreeSet>, HashMap)> { + async fn discover( + &self, + ) -> Result<(BTreeSet>, HashMap)> { let bad = Backend::new_with_meta("127.0.0.1:79", 3u32).unwrap(); let (backends, mut readiness) = self.0.discover().await?; readiness.insert(bad.hash_key(), false); @@ -533,11 +532,11 @@ mod test { #[tokio::test] async fn test_parallel_health_check() { let discovery = discovery::Static::default(); - let good1 = Backend::new("1.1.1.1:80").unwrap(); + let good1 = Backend::new_with_meta("1.1.1.1:80", 100u32).unwrap(); discovery.add(good1.clone()); - let good2 = Backend::new("1.0.0.1:80").unwrap(); + let good2 = Backend::new_with_meta("1.0.0.1:80", 200u32).unwrap(); discovery.add(good2.clone()); - let bad = Backend::new("127.0.0.1:79").unwrap(); + let bad = Backend::new_with_meta("127.0.0.1:79", 404u32).unwrap(); discovery.add(bad.clone()); let mut backends = Backends::new(Box::new(discovery)); diff --git a/pingora-load-balancing/src/selection/consistent.rs b/pingora-load-balancing/src/selection/consistent.rs index 4feb8b0a..dcbca184 100644 --- a/pingora-load-balancing/src/selection/consistent.rs +++ b/pingora-load-balancing/src/selection/consistent.rs @@ -85,9 +85,9 @@ mod test { #[test] fn test_ketama() { - let b1 = Backend::new("1.1.1.1:80").unwrap(); - let b2 = Backend::new("1.0.0.1:80").unwrap(); - let b3 = Backend::new("1.0.0.255:80").unwrap(); + let b1 = Backend::new_with_meta("1.1.1.1:80", 200u32).unwrap(); + let b2 = Backend::new_with_meta("1.0.0.1:80", 300u32).unwrap(); + let b3 = Backend::new_with_meta("1.0.0.255:80", 400u32).unwrap(); let backends = BTreeSet::from_iter([b1.clone(), b2.clone(), b3.clone()]); let hash = Arc::new(KetamaHashing::build(&backends)); diff --git a/pingora-load-balancing/src/selection/weighted.rs b/pingora-load-balancing/src/selection/weighted.rs index f58f6bcf..b3e07c7a 100644 --- a/pingora-load-balancing/src/selection/weighted.rs +++ b/pingora-load-balancing/src/selection/weighted.rs @@ -192,10 +192,10 @@ mod test { #[test] fn test_random() { - let b1 = Backend::new("1.1.1.1:80").unwrap(); - let mut b2 = Backend::new("1.0.0.1:80").unwrap(); + let b1 = Backend::new_with_meta("1.1.1.1:80", 100u32).unwrap(); + let mut b2 = Backend::new_with_meta("1.0.0.1:80", 100u32).unwrap(); b2.weight = 8; // 8x than the rest - let b3 = Backend::new("1.0.0.255:80").unwrap(); + let b3 = Backend::new_with_meta("1.0.0.255:80", 100u32).unwrap(); let backends = BTreeSet::from_iter([b1.clone(), b2.clone(), b3.clone()]); let hash: Arc> = Arc::new(Weighted::build(&backends)); diff --git a/pingora-proxy/examples/load_balancer.rs b/pingora-proxy/examples/load_balancer.rs index 614981d6..e13b30f3 100644 --- a/pingora-proxy/examples/load_balancer.rs +++ b/pingora-proxy/examples/load_balancer.rs @@ -25,7 +25,7 @@ use pingora_core::Result; use pingora_load_balancing::{health_check, selection::RoundRobin, LoadBalancer}; use pingora_proxy::{ProxyHttp, Session}; -pub struct LB(Arc>); +pub struct LB(Arc, ()>>); #[async_trait] impl ProxyHttp for LB { @@ -68,7 +68,7 @@ fn main() { // 127.0.0.1:343" is just a bad server let mut upstreams = - LoadBalancer::try_from_iter(["1.1.1.1:443", "1.0.0.1:443", "127.0.0.1:343"]).unwrap(); + LoadBalancer::try_from_iter_default_meta(["1.1.1.1:443", "1.0.0.1:443", "127.0.0.1:343"]).unwrap(); // We add health check in the background so that the bad server is never selected. let hc = health_check::TcpHealthCheck::new(); diff --git a/pingora-proxy/examples/multi_lb.rs b/pingora-proxy/examples/multi_lb.rs index 1321c207..289b7dad 100644 --- a/pingora-proxy/examples/multi_lb.rs +++ b/pingora-proxy/examples/multi_lb.rs @@ -24,8 +24,8 @@ use pingora_load_balancing::{ use pingora_proxy::{http_proxy_service, ProxyHttp, Session}; struct Router { - cluster_one: Arc>, - cluster_two: Arc>, + cluster_one: Arc, ()>>, + cluster_two: Arc, ()>>, } #[async_trait] @@ -53,12 +53,13 @@ impl ProxyHttp for Router { } } -fn build_cluster_service(upstreams: &[&str]) -> GenBackgroundService> +fn build_cluster_service(upstreams: &[&str]) -> GenBackgroundService> where - S: BackendSelection + 'static, - S::Iter: BackendIter, + S: BackendSelection + 'static, + S::Iter: BackendIter, + M: Clone + Default + Send + Sync + std::hash::Hash + Ord + 'static, { - let mut cluster = LoadBalancer::try_from_iter(upstreams).unwrap(); + let mut cluster = LoadBalancer::try_from_iter_default_meta(upstreams).unwrap(); cluster.set_health_check(TcpHealthCheck::new()); cluster.health_check_frequency = Some(std::time::Duration::from_secs(1)); @@ -73,8 +74,8 @@ fn main() { my_server.bootstrap(); // build multiple clusters - let cluster_one = build_cluster_service::(&["1.1.1.1:443", "127.0.0.1:343"]); - let cluster_two = build_cluster_service::(&["1.0.0.1:443", "127.0.0.2:343"]); + let cluster_one = build_cluster_service::, ()>(&["1.1.1.1:443", "127.0.0.1:343"]); + let cluster_two = build_cluster_service::, ()>(&["1.0.0.1:443", "127.0.0.2:343"]); let router = Router { cluster_one: cluster_one.task(),