Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: lb: PeakEWMA load balancer #32942

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions envoy/stats/primitive_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
#include "envoy/stats/tag.h"

#include "source/common/common/assert.h"
#include "source/common/common/lock_guard.h"
#include "source/common/common/non_copyable.h"
#include "source/common/common/thread.h"

#include "absl/strings/string_view.h"

Expand Down Expand Up @@ -107,5 +109,33 @@ class PrimitiveGaugeSnapshot : public PrimitiveMetricMetadata {
const uint64_t value_;
};

class PrimitiveEwma : NonCopyable {
public:
PrimitiveEwma() = default;

double value() const { return value_; }

double decay_value() {
update(0);
return value_;
}

void update(double v) {
Thread::LockGuard guard_(mutex_);
long now = time(nullptr);
value_ = v * alpha_ + pow(1 - alpha_, now - last_tick_) * value_;
last_tick_ = now;
}

private:
const double alpha_ = 1 - exp(-5 / 60 / 1); // for one-minute moving average

Thread::MutexBasicLockable mutex_;
std::atomic<double> value_{0};
time_t last_tick_{time(nullptr)};
};

using PrimitiveEwmaReference = std::reference_wrapper<PrimitiveEwma>;

} // namespace Stats
} // namespace Envoy
2 changes: 2 additions & 0 deletions envoy/stats/primitive_stats_macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ namespace Envoy {
// Fully-qualified for use in external callsites.
#define GENERATE_PRIMITIVE_COUNTER_STRUCT(NAME) Envoy::Stats::PrimitiveCounter NAME##_;
#define GENERATE_PRIMITIVE_GAUGE_STRUCT(NAME) Envoy::Stats::PrimitiveGauge NAME##_;
#define GENERATE_PRIMITIVE_EWMA_STRUCT(NAME) Envoy::Stats::PrimitiveEwma NAME##_;

// Name and counter/gauge reference pair used to construct map of counters/gauges.
#define PRIMITIVE_COUNTER_NAME_AND_REFERENCE(X) {absl::string_view(#X), std::ref(X##_)},
Expand All @@ -50,5 +51,6 @@ namespace Envoy {
// Ignore a counter or gauge.
#define IGNORE_PRIMITIVE_COUNTER(X)
#define IGNORE_PRIMITIVE_GAUGE(X)
#define IGNORE_PRIMITIVE_EWMA(X)

} // namespace Envoy
16 changes: 11 additions & 5 deletions envoy/upstream/host_description.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,30 +29,36 @@ using MetadataConstSharedPtr = std::shared_ptr<const envoy::config::core::v3::Me
* envoy.api.v2.endpoint.UpstreamLocalityStats for the definitions of success/error. These are
* latched by LoadStatsReporter, independent of the normal stats sink flushing.
*/
#define ALL_HOST_STATS(COUNTER, GAUGE) \
#define ALL_HOST_STATS(COUNTER, GAUGE, EWMA) \
COUNTER(cx_connect_fail) \
COUNTER(cx_total) \
COUNTER(rq_error) \
COUNTER(rq_success) \
COUNTER(rq_timeout) \
COUNTER(rq_total) \
GAUGE(cx_active) \
GAUGE(rq_active)
GAUGE(rq_active) \
EWMA(decay_rtt) \
EWMA(decay_err) \
EWMA(decay_total)

/**
* All per host stats defined. @see stats_macros.h
*/
struct HostStats {
ALL_HOST_STATS(GENERATE_PRIMITIVE_COUNTER_STRUCT, GENERATE_PRIMITIVE_GAUGE_STRUCT);
ALL_HOST_STATS(GENERATE_PRIMITIVE_COUNTER_STRUCT, GENERATE_PRIMITIVE_GAUGE_STRUCT,
GENERATE_PRIMITIVE_EWMA_STRUCT);

// Provide access to name,counter pairs.
std::vector<std::pair<absl::string_view, Stats::PrimitiveCounterReference>> counters() {
return {ALL_HOST_STATS(PRIMITIVE_COUNTER_NAME_AND_REFERENCE, IGNORE_PRIMITIVE_GAUGE)};
return {ALL_HOST_STATS(PRIMITIVE_COUNTER_NAME_AND_REFERENCE, IGNORE_PRIMITIVE_GAUGE,
IGNORE_PRIMITIVE_EWMA)};
}

// Provide access to name,gauge pairs.
std::vector<std::pair<absl::string_view, Stats::PrimitiveGaugeReference>> gauges() {
return {ALL_HOST_STATS(IGNORE_PRIMITIVE_COUNTER, PRIMITIVE_GAUGE_NAME_AND_REFERENCE)};
return {ALL_HOST_STATS(IGNORE_PRIMITIVE_COUNTER, PRIMITIVE_GAUGE_NAME_AND_REFERENCE,
IGNORE_PRIMITIVE_EWMA)};
}
};

Expand Down
47 changes: 47 additions & 0 deletions source/common/upstream/load_balancer_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1430,5 +1430,52 @@ HostConstSharedPtr RandomLoadBalancer::peekOrChoose(LoadBalancerContext* context
return hosts_to_use[random_hash % hosts_to_use.size()];
}

HostConstSharedPtr PeakEWMALoadBalancer::chooseHostOnce(LoadBalancerContext* context) {
const absl::optional<HostsSource> hosts_source = hostSourceToUse(context, random(false));
if (!hosts_source) {
return nullptr;
}

const HostVector& hosts_to_use = hostSourceToHosts(*hosts_source);
if (hosts_to_use.empty()) {
return nullptr;
}

HostSharedPtr candidate_host = nullptr;
// TODO(jizhuozhi): maybe this should be configurable, but there seems to be no problem so far.
for (uint32_t choice_idx = 0; choice_idx < 2; ++choice_idx) {
const int rand_idx = random_.random() % hosts_to_use.size();
HostSharedPtr sampled_host = hosts_to_use[rand_idx];

if (candidate_host == nullptr) {
// Make a first choice to start the comparisons.
candidate_host = sampled_host;
continue;
}

const double candidate_score = score(candidate_host);
const double sampled_score = score(sampled_host);
if (candidate_score < sampled_score) {
candidate_host = sampled_host;
}
}
return candidate_host;
}

double PeakEWMALoadBalancer::score(HostSharedPtr host) {
double active_req = host->stats().rq_active_.value();
double weight = host->weight();
double decay_rtt = host->stats().decay_rtt_.decay_value();
double decay_err = host->stats().decay_err_.decay_value();
double decay_total = host->stats().decay_total_.decay_value();

// TODO(jizhuozhi): maybe this should be configurable, but there seems to be no problem so far.
if (decay_rtt == 0) {
decay_rtt = DEFAULT_RTT;
}

return ((active_req + 1) / weight) * decay_rtt / (1 - (decay_err / (decay_total + 1)));
}

} // namespace Upstream
} // namespace Envoy
21 changes: 21 additions & 0 deletions source/common/upstream/load_balancer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -784,5 +784,26 @@ class RandomLoadBalancer : public ZoneAwareLoadBalancerBase {
HostConstSharedPtr peekOrChoose(LoadBalancerContext* context, bool peek);
};

class PeakEWMALoadBalancer : public ZoneAwareLoadBalancerBase {
public:
~PeakEWMALoadBalancer() = default;
HostConstSharedPtr chooseHostOnce(LoadBalancerContext* context) override;

PeakEWMALoadBalancer(const PrioritySet& priority_set, const PrioritySet* local_priority_set,
ClusterLbStats& stats, Runtime::Loader& runtime,
Random::RandomGenerator& random,
const envoy::config::cluster::v3::Cluster::CommonLbConfig& common_config)
: ZoneAwareLoadBalancerBase(
priority_set, local_priority_set, stats, runtime, random,
PROTOBUF_PERCENT_TO_ROUNDED_INTEGER_OR_DEFAULT(common_config, healthy_panic_threshold,
100, 50),
LoadBalancerConfigHelper::localityLbConfigFromCommonLbConfig(common_config)) {}

private:
constexpr static const double DEFAULT_RTT = 100; // 100ms as default for new instance

double score(HostSharedPtr host);
};

} // namespace Upstream
} // namespace Envoy
Loading