Skip to content

Commit

Permalink
Other deps build
Browse files Browse the repository at this point in the history
  • Loading branch information
jamesmunns committed Jun 24, 2024
1 parent 2facd68 commit d0f8e86
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 46 deletions.
5 changes: 4 additions & 1 deletion pingora-load-balancing/src/background.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@

//! Implement [BackgroundService] for [LoadBalancer]

use std::{hash::Hash, time::{Duration, Instant}};
use std::{
hash::Hash,
time::{Duration, Instant},
};

use super::{BackendIter, BackendSelection, LoadBalancer};
use async_trait::async_trait;
Expand Down
2 changes: 1 addition & 1 deletion pingora-load-balancing/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ where

impl<M> Static<M>
where
M: Ord + Clone
M: Ord + Clone,
{
/// Create a new boxed [Static] service discovery with the given backends.
pub fn new(backends: BTreeSet<Backend<M>>) -> Box<Self> {
Expand Down
1 change: 0 additions & 1 deletion pingora-load-balancing/src/health_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ where
}

impl<M> TcpHealthCheck<M> {

/// Create a new [TcpHealthCheck] that tries to establish a TLS connection.
///
/// The default values are the same as [Self::new()].
Expand Down
53 changes: 26 additions & 27 deletions pingora-load-balancing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,6 @@ pub struct Backend<M> {
pub metadata: M,
}

impl Backend<()> {
/// Create a new [Backend] with `weight` 1. The function will try to parse
/// `addr` into a [std::net::SocketAddr].
pub fn new(addr: &str) -> Result<Self> {
Self::new_with_meta(addr, ())
}
}

impl<M> Backend<M>
where
M: Hash,
Expand All @@ -90,20 +82,22 @@ impl<M> Backend<M> {
})
// TODO: UDS
}
}

impl<M> std::ops::Deref for Backend<M> {
type Target = SocketAddr;

fn deref(&self) -> &Self::Target {
pub fn addr(&self) -> &SocketAddr {
&self.addr
}
}

impl<M> std::ops::DerefMut for Backend<M> {
fn deref_mut(&mut self) -> &mut Self::Target {
pub fn addr_mut(&mut self) -> &mut SocketAddr {
&mut self.addr
}

pub fn metadata(&self) -> &M {
&self.metadata
}

pub fn metadata_mut(&mut self) -> &mut M {
&mut self.metadata
}
}

impl<M> std::net::ToSocketAddrs for Backend<M> {
Expand Down Expand Up @@ -153,7 +147,11 @@ where
M: PartialEq + Hash,
{
/// Return true when the new is different from the current set of backends
fn do_update(&self, new_backends: BTreeSet<Backend<M>>, enablement: HashMap<u64, bool>) -> bool {
fn do_update(
&self,
new_backends: BTreeSet<Backend<M>>,
enablement: HashMap<u64, bool>,
) -> bool {
if (**self.backends.load()) != new_backends {
let old_health = self.health.load();
let mut health = HashMap::with_capacity(new_backends.len());
Expand Down Expand Up @@ -326,7 +324,7 @@ where
///
/// Note: [ToSocketAddrs] will invoke blocking network IO for DNS lookup if
/// the input cannot be directly parsed as [SocketAddr].
pub fn try_from_iter<A, T: IntoIterator<Item = A>>(iter: T) -> IoResult<Self>
pub fn try_from_iter_default_meta<A, T: IntoIterator<Item = A>>(iter: T) -> IoResult<Self>
where
A: ToSocketAddrs,
{
Expand Down Expand Up @@ -426,7 +424,6 @@ where
S: BackendSelection<Metadata = M> + 'static,
S::Iter: BackendIter<Metadata = M>,
{

/// Set the health check method. See [health_check].
pub fn set_health_check(
&mut self,
Expand All @@ -451,7 +448,7 @@ mod test {
#[tokio::test]
async fn test_static_backends() {
let backends: LoadBalancer<selection::RoundRobin<_>, _> =
LoadBalancer::try_from_iter(["1.1.1.1:80", "1.0.0.1:80"]).unwrap();
LoadBalancer::try_from_iter_default_meta(["1.1.1.1:80", "1.0.0.1:80"]).unwrap();

let backend1 = Backend::new_with_meta("1.1.1.1:80", u32::default()).unwrap();
let backend2 = Backend::new_with_meta("1.0.0.1:80", u32::default()).unwrap();
Expand All @@ -463,11 +460,11 @@ mod test {
#[tokio::test]
async fn test_backends() {
let discovery = discovery::Static::default();
let good1 = Backend::new("1.1.1.1:80").unwrap();
let good1 = Backend::new_with_meta("1.1.1.1:80", 101u32).unwrap();
discovery.add(good1.clone());
let good2 = Backend::new("1.0.0.1:80").unwrap();
let good2 = Backend::new_with_meta("1.0.0.1:80", 102u32).unwrap();
discovery.add(good2.clone());
let bad = Backend::new("127.0.0.1:79").unwrap();
let bad = Backend::new_with_meta("127.0.0.1:79", 404u32).unwrap();
discovery.add(bad.clone());

let mut backends = Backends::new(Box::new(discovery));
Expand Down Expand Up @@ -501,7 +498,9 @@ mod test {
impl ServiceDiscovery for TestDiscovery {
type Metadata = TestMetadata;

async fn discover(&self) -> Result<(BTreeSet<Backend<TestMetadata>>, HashMap<u64, bool>)> {
async fn discover(
&self,
) -> Result<(BTreeSet<Backend<TestMetadata>>, HashMap<u64, bool>)> {
let bad = Backend::new_with_meta("127.0.0.1:79", 3u32).unwrap();
let (backends, mut readiness) = self.0.discover().await?;
readiness.insert(bad.hash_key(), false);
Expand Down Expand Up @@ -533,11 +532,11 @@ mod test {
#[tokio::test]
async fn test_parallel_health_check() {
let discovery = discovery::Static::default();
let good1 = Backend::new("1.1.1.1:80").unwrap();
let good1 = Backend::new_with_meta("1.1.1.1:80", 100u32).unwrap();
discovery.add(good1.clone());
let good2 = Backend::new("1.0.0.1:80").unwrap();
let good2 = Backend::new_with_meta("1.0.0.1:80", 200u32).unwrap();
discovery.add(good2.clone());
let bad = Backend::new("127.0.0.1:79").unwrap();
let bad = Backend::new_with_meta("127.0.0.1:79", 404u32).unwrap();
discovery.add(bad.clone());

let mut backends = Backends::new(Box::new(discovery));
Expand Down
6 changes: 3 additions & 3 deletions pingora-load-balancing/src/selection/consistent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ mod test {

#[test]
fn test_ketama() {
let b1 = Backend::new("1.1.1.1:80").unwrap();
let b2 = Backend::new("1.0.0.1:80").unwrap();
let b3 = Backend::new("1.0.0.255:80").unwrap();
let b1 = Backend::new_with_meta("1.1.1.1:80", 200u32).unwrap();
let b2 = Backend::new_with_meta("1.0.0.1:80", 300u32).unwrap();
let b3 = Backend::new_with_meta("1.0.0.255:80", 400u32).unwrap();
let backends = BTreeSet::from_iter([b1.clone(), b2.clone(), b3.clone()]);
let hash = Arc::new(KetamaHashing::build(&backends));

Expand Down
6 changes: 3 additions & 3 deletions pingora-load-balancing/src/selection/weighted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,10 @@ mod test {

#[test]
fn test_random() {
let b1 = Backend::new("1.1.1.1:80").unwrap();
let mut b2 = Backend::new("1.0.0.1:80").unwrap();
let b1 = Backend::new_with_meta("1.1.1.1:80", 100u32).unwrap();
let mut b2 = Backend::new_with_meta("1.0.0.1:80", 100u32).unwrap();
b2.weight = 8; // 8x than the rest
let b3 = Backend::new("1.0.0.255:80").unwrap();
let b3 = Backend::new_with_meta("1.0.0.255:80", 100u32).unwrap();
let backends = BTreeSet::from_iter([b1.clone(), b2.clone(), b3.clone()]);
let hash: Arc<Weighted<_, Random>> = Arc::new(Weighted::build(&backends));

Expand Down
4 changes: 2 additions & 2 deletions pingora-proxy/examples/load_balancer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use pingora_core::Result;
use pingora_load_balancing::{health_check, selection::RoundRobin, LoadBalancer};
use pingora_proxy::{ProxyHttp, Session};

pub struct LB(Arc<LoadBalancer<RoundRobin>>);
pub struct LB(Arc<LoadBalancer<RoundRobin<()>, ()>>);

#[async_trait]
impl ProxyHttp for LB {
Expand Down Expand Up @@ -68,7 +68,7 @@ fn main() {

// 127.0.0.1:343" is just a bad server
let mut upstreams =
LoadBalancer::try_from_iter(["1.1.1.1:443", "1.0.0.1:443", "127.0.0.1:343"]).unwrap();
LoadBalancer::try_from_iter_default_meta(["1.1.1.1:443", "1.0.0.1:443", "127.0.0.1:343"]).unwrap();

// We add health check in the background so that the bad server is never selected.
let hc = health_check::TcpHealthCheck::new();
Expand Down
17 changes: 9 additions & 8 deletions pingora-proxy/examples/multi_lb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use pingora_load_balancing::{
use pingora_proxy::{http_proxy_service, ProxyHttp, Session};

struct Router {
cluster_one: Arc<LoadBalancer<RoundRobin>>,
cluster_two: Arc<LoadBalancer<RoundRobin>>,
cluster_one: Arc<LoadBalancer<RoundRobin<()>, ()>>,
cluster_two: Arc<LoadBalancer<RoundRobin<()>, ()>>,
}

#[async_trait]
Expand Down Expand Up @@ -53,12 +53,13 @@ impl ProxyHttp for Router {
}
}

fn build_cluster_service<S>(upstreams: &[&str]) -> GenBackgroundService<LoadBalancer<S>>
fn build_cluster_service<S, M>(upstreams: &[&str]) -> GenBackgroundService<LoadBalancer<S, M>>
where
S: BackendSelection + 'static,
S::Iter: BackendIter,
S: BackendSelection<Metadata = M> + 'static,
S::Iter: BackendIter<Metadata = M>,
M: Clone + Default + Send + Sync + std::hash::Hash + Ord + 'static,
{
let mut cluster = LoadBalancer::try_from_iter(upstreams).unwrap();
let mut cluster = LoadBalancer::try_from_iter_default_meta(upstreams).unwrap();
cluster.set_health_check(TcpHealthCheck::new());
cluster.health_check_frequency = Some(std::time::Duration::from_secs(1));

Expand All @@ -73,8 +74,8 @@ fn main() {
my_server.bootstrap();

// build multiple clusters
let cluster_one = build_cluster_service::<RoundRobin>(&["1.1.1.1:443", "127.0.0.1:343"]);
let cluster_two = build_cluster_service::<RoundRobin>(&["1.0.0.1:443", "127.0.0.2:343"]);
let cluster_one = build_cluster_service::<RoundRobin<_>, ()>(&["1.1.1.1:443", "127.0.0.1:343"]);
let cluster_two = build_cluster_service::<RoundRobin<_>, ()>(&["1.0.0.1:443", "127.0.0.2:343"]);

let router = Router {
cluster_one: cluster_one.task(),
Expand Down

0 comments on commit d0f8e86

Please sign in to comment.