Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/peak_ewma/load_balancing_policies/source/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Upstream::LoadBalancerPtr PeakEwmaCreator::operator()(
priority_set, nullptr, cluster_info.lbStats(), runtime, random,
PROTOBUF_PERCENT_TO_ROUNDED_INTEGER_OR_DEFAULT(cluster_info.lbConfig(),
healthy_panic_threshold, 100, 50),
cluster_info, time_source, config->lb_config_, config->main_dispatcher_);
cluster_info, time_source, config->lb_config_);
}

/**
Expand Down
11 changes: 3 additions & 8 deletions contrib/peak_ewma/load_balancing_policies/source/config.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#pragma once

#include "envoy/event/dispatcher.h"
#include "envoy/thread_local/thread_local.h"
#include "envoy/upstream/load_balancer.h"

#include "source/common/common/logger.h"
Expand All @@ -20,11 +18,9 @@ using PeakEwmaLbProto = envoy::extensions::load_balancing_policies::peak_ewma::v

class TypedPeakEwmaLbConfig : public Upstream::LoadBalancerConfig {
public:
TypedPeakEwmaLbConfig(const PeakEwmaLbProto& lb_config, Event::Dispatcher& main_dispatcher)
: lb_config_(lb_config), main_dispatcher_(main_dispatcher) {}
explicit TypedPeakEwmaLbConfig(const PeakEwmaLbProto& lb_config) : lb_config_(lb_config) {}

PeakEwmaLbProto lb_config_;
Event::Dispatcher& main_dispatcher_;
};

struct PeakEwmaCreator : public Logger::Loggable<Logger::Id::upstream> {
Expand All @@ -41,11 +37,10 @@ class Factory
Factory() : FactoryBase("envoy.load_balancing_policies.peak_ewma") {}

absl::StatusOr<Upstream::LoadBalancerConfigPtr>
loadConfig(Server::Configuration::ServerFactoryContext& context,
loadConfig(Server::Configuration::ServerFactoryContext&,
const Protobuf::Message& config) override {
const auto& typed_config = dynamic_cast<const PeakEwmaLbProto&>(config);
return Upstream::LoadBalancerConfigPtr{
new TypedPeakEwmaLbConfig(typed_config, context.mainThreadDispatcher())};
return Upstream::LoadBalancerConfigPtr{new TypedPeakEwmaLbConfig(typed_config)};
}
};

Expand Down
101 changes: 34 additions & 67 deletions contrib/peak_ewma/load_balancing_policies/source/peak_ewma_lb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,18 @@ PeakEwmaLoadBalancer::PeakEwmaLoadBalancer(
Upstream::ClusterLbStats& /*stats*/, Runtime::Loader& runtime, Random::RandomGenerator& random,
uint32_t /* healthy_panic_threshold */, const Upstream::ClusterInfo& cluster_info,
TimeSource& time_source,
const envoy::extensions::load_balancing_policies::peak_ewma::v3alpha::PeakEwma& config,
Event::Dispatcher& main_dispatcher)
const envoy::extensions::load_balancing_policies::peak_ewma::v3alpha::PeakEwma& config)
: LoadBalancerBase(priority_set, cluster_info.lbStats(), runtime, random,
PROTOBUF_PERCENT_TO_ROUNDED_INTEGER_OR_DEFAULT(
cluster_info.lbConfig(), healthy_panic_threshold, 100, 50)),
priority_set_(priority_set), config_proto_(config), random_(random),
time_source_(time_source), stats_scope_(cluster_info.statsScope()),
cost_(config.has_penalty_value() ? config.penalty_value().value() : 1000000.0),
main_dispatcher_(main_dispatcher),
aggregation_interval_(config_proto_.has_aggregation_interval()
? std::chrono::milliseconds(DurationUtil::durationToMilliseconds(
config_proto_.aggregation_interval()))
: std::chrono::milliseconds(100)),
last_aggregation_time_(time_source_.monotonicTime()),
tau_nanos_(config_proto_.has_decay_time()
? DurationUtil::durationToMilliseconds(config_proto_.decay_time()) * 1000000LL
: kDefaultDecayTimeSeconds * 1000000000LL),
Expand All @@ -76,41 +75,16 @@ PeakEwmaLoadBalancer::PeakEwmaLoadBalancer(
addPeakEwmaLbPolicyDataToHosts(host_set->hosts());
}

// Setup callback to add data to new hosts.
priority_update_cb_ =
priority_set_.addPriorityUpdateCb([this](uint32_t, const Upstream::HostVector& hosts_added,
const Upstream::HostVector&) -> absl::Status {
// Setup callback to add data to new hosts and clean up removed hosts.
priority_update_cb_ = priority_set_.addPriorityUpdateCb(
[this](uint32_t, const Upstream::HostVector& hosts_added,
const Upstream::HostVector& hosts_removed) -> absl::Status {
addPeakEwmaLbPolicyDataToHosts(hosts_added);
for (const auto& host : hosts_removed) {
all_host_stats_.erase(host);
}
return absl::OkStatus();
});

// Create timer for EWMA aggregation.
aggregation_timer_ = main_dispatcher_.createTimer([this]() -> void { onAggregationTimer(); });
aggregation_timer_->enableTimer(aggregation_interval_);

// Peak EWMA load balancer initialized successfully.
}

PeakEwmaLoadBalancer::~PeakEwmaLoadBalancer() {
// Post timer cancellation to main thread to avoid cross-thread timer operations.
// Timer must be disabled from the same thread that created it (main_dispatcher_).
if (aggregation_timer_) {
main_dispatcher_.post([timer = std::move(aggregation_timer_)]() mutable {
if (timer) {
timer->disableTimer();
timer.reset();
}
});
}

// EWMA snapshot cleanup is automatic via shared_ptr destructor.

// Clean up host data.
for (const auto& host_set : priority_set_.hostSetsPerPriority()) {
for (const auto& host : host_set->hosts()) {
host->setLbPolicyData(nullptr);
}
}
}

// Host management.
Expand All @@ -130,12 +104,12 @@ PeakEwmaHostLbPolicyData* PeakEwmaLoadBalancer::getPeakEwmaData(Upstream::HostCo
return dynamic_cast<PeakEwmaHostLbPolicyData*>(lb_data.ptr());
}

void PeakEwmaLoadBalancer::onAggregationTimer() {
// Timer callback - aggregate EWMA data from all hosts.
aggregateWorkerData();

// Reschedule timer for next cycle.
aggregation_timer_->enableTimer(aggregation_interval_);
void PeakEwmaLoadBalancer::maybeAggregate() {
const auto now = time_source_.monotonicTime();
if (now - last_aggregation_time_ >= aggregation_interval_) {
aggregateWorkerData();
last_aggregation_time_ = now;
}
}

double PeakEwmaLoadBalancer::calculateHostCost(Upstream::HostConstSharedPtr host) {
Expand Down Expand Up @@ -192,6 +166,9 @@ PeakEwmaLoadBalancer::selectFromTwoCandidates(const Upstream::HostVector& hosts,

Upstream::HostSelectionResponse
PeakEwmaLoadBalancer::chooseHost(Upstream::LoadBalancerContext* /* context */) {
// Lazily aggregate EWMA data if the interval has elapsed.
maybeAggregate();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I then assume it is okay for the aggregation that only happens when there’s LB traffic — no host selection means no EWMA updates, but this is acceptable since no traffic means no need for a decision.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@botengyao That's correct. If there's no LB traffic, chooseHost() never fires, so aggregation doesn't run and EWMA values go stale.

But that's fine. Stale EWMA only matters when we need to make a selection, and if no one is calling chooseHost() there's no decision to make.

When traffic resumes, the first chooseHost() call will trigger aggregation and pick up any samples that accumulated in the meantime.

Your comment piqued me to look into this and I spotted two bugs: a ring buffer overflow when more than max_samples are written between aggregations, and an inverted alpha weighting that gave older samples more influence than newer ones. I put fixes in beca896.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sg, and feel free to raise another PR.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@botengyao If it's ok, I included the fixes in this PR.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that also works


// Power of Two Choices selection using host-attached EWMA data.
const auto& host_sets = priority_set_.hostSetsPerPriority();

Expand Down Expand Up @@ -268,23 +245,9 @@ void PeakEwmaLoadBalancer::aggregateWorkerData() {
// Aggregation cycle complete.
}

size_t PeakEwmaLoadBalancer::calculateNewSampleCount(size_t last_processed, size_t current_write,
size_t max_samples) {
if (last_processed == current_write) {
return 0;
}

if (current_write >= last_processed) {
return current_write - last_processed;
} else {
// Write index wrapped around.
return (max_samples - last_processed) + current_write;
}
}

double PeakEwmaLoadBalancer::calculateTimeBasedAlpha(uint64_t current_time_ns,
uint64_t sample_time_ns) {
int64_t time_delta_ns = static_cast<int64_t>(current_time_ns - sample_time_ns);
double PeakEwmaLoadBalancer::calculateTimeBasedAlpha(uint64_t later_time_ns,
uint64_t earlier_time_ns) {
int64_t time_delta_ns = static_cast<int64_t>(later_time_ns - earlier_time_ns);
if (time_delta_ns <= 0) {
return 1.0; // Use full weight for future/concurrent samples.
}
Expand Down Expand Up @@ -316,17 +279,20 @@ void PeakEwmaLoadBalancer::processHostSamples(Upstream::HostConstSharedPtr /* ho

// Get the range of new samples to process (atomic ring buffer).
auto [last_processed, current_write] = data->getNewSampleRange();

size_t num_new_samples =
calculateNewSampleCount(last_processed, current_write, data->max_samples_);
if (num_new_samples == 0)
if (last_processed == current_write)
return;

// If ring buffer was fully overwritten, skip to oldest valid slot.
// Uses unsigned arithmetic (always correct since write_index_ only increments).
if (current_write - last_processed > data->max_samples_) {
last_processed = current_write - data->max_samples_;
}

size_t num_new_samples = current_write - last_processed;

// Get current EWMA state.
double current_ewma = data->getEwmaRtt();
uint64_t current_time_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
time_source_.monotonicTime().time_since_epoch())
.count();
uint64_t reference_time = data->last_update_timestamp_.load();

// Process all new samples in chronological order.
size_t processed_index = last_processed;
Expand All @@ -342,13 +308,14 @@ void PeakEwmaLoadBalancer::processHostSamples(Upstream::HostConstSharedPtr /* ho
continue;
}

double alpha = calculateTimeBasedAlpha(current_time_ns, timestamp_ns);
double alpha = calculateTimeBasedAlpha(timestamp_ns, reference_time);
current_ewma = updateEwmaWithSample(current_ewma, rtt_ms, alpha);
reference_time = timestamp_ns;
processed_index++;
}

// Update atomic EWMA in host data.
data->updateEwma(current_ewma, current_time_ns);
data->updateEwma(current_ewma, reference_time);
data->markSamplesProcessed(current_write);
}

Expand Down
23 changes: 7 additions & 16 deletions contrib/peak_ewma/load_balancing_policies/source/peak_ewma_lb.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@
#include <new>
#include <vector>

#include "envoy/event/dispatcher.h"
#include "envoy/event/timer.h"
#include "envoy/thread_local/thread_local.h"
#include "envoy/thread_local/thread_local_object.h"
#include "envoy/upstream/load_balancer.h"

#include "source/common/common/callback_impl.h"
Expand Down Expand Up @@ -70,7 +66,7 @@ struct GlobalHostStats {
*
* Architecture:
* - HTTP filter records RTT samples in host-attached ring buffers (lock-free)
* - Timer aggregates samples every 100ms and updates EWMA values
* - Aggregation happens lazily inline in chooseHost() when the interval elapses
* - P2C selection uses current EWMA + active requests for cost calculation
*/
class PeakEwmaLoadBalancer : public Upstream::LoadBalancerBase {
Expand All @@ -80,10 +76,7 @@ class PeakEwmaLoadBalancer : public Upstream::LoadBalancerBase {
Upstream::ClusterLbStats& stats, Runtime::Loader& runtime, Random::RandomGenerator& random,
uint32_t healthy_panic_threshold, const Upstream::ClusterInfo& cluster_info,
TimeSource& time_source,
const envoy::extensions::load_balancing_policies::peak_ewma::v3alpha::PeakEwma& config,
Event::Dispatcher& main_dispatcher);

~PeakEwmaLoadBalancer();
const envoy::extensions::load_balancing_policies::peak_ewma::v3alpha::PeakEwma& config);

// LoadBalancer interface
Upstream::HostSelectionResponse chooseHost(Upstream::LoadBalancerContext* context) override;
Expand All @@ -96,19 +89,18 @@ class PeakEwmaLoadBalancer : public Upstream::LoadBalancerBase {
void addPeakEwmaLbPolicyDataToHosts(const Upstream::HostVector& hosts);
PeakEwmaHostLbPolicyData* getPeakEwmaData(Upstream::HostConstSharedPtr host);

// Timer-based aggregation - processes host-attached sample data.
// Inline aggregation - processes host-attached sample data.
void aggregateWorkerData();
void processHostSamples(Upstream::HostConstSharedPtr host, PeakEwmaHostLbPolicyData* data);
void onAggregationTimer();
void maybeAggregate();

// Power of Two Choices selection.
Upstream::HostConstSharedPtr selectFromTwoCandidates(const Upstream::HostVector& hosts,
uint64_t random_value);
double calculateHostCost(Upstream::HostConstSharedPtr host);

// EWMA calculation helpers.
size_t calculateNewSampleCount(size_t last_processed, size_t current_write, size_t max_samples);
double calculateTimeBasedAlpha(uint64_t current_time_ns, uint64_t sample_time_ns);
double calculateTimeBasedAlpha(uint64_t later_time_ns, uint64_t earlier_time_ns);
double updateEwmaWithSample(double current_ewma, double new_rtt_ms, double alpha);

// Core infrastructure.
Expand All @@ -121,10 +113,9 @@ class PeakEwmaLoadBalancer : public Upstream::LoadBalancerBase {
// Business logic components.
Cost cost_;

// Timer infrastructure for periodic EWMA calculation.
Event::Dispatcher& main_dispatcher_;
Event::TimerPtr aggregation_timer_;
// Inline aggregation state.
const std::chrono::milliseconds aggregation_interval_;
MonotonicTime last_aggregation_time_;

// Host stats for admin interface visibility.
absl::flat_hash_map<Upstream::HostConstSharedPtr, std::unique_ptr<GlobalHostStats>>
Expand Down
15 changes: 13 additions & 2 deletions contrib/peak_ewma/load_balancing_policies/test/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ envoy_cc_test(
"//source/common/network:address_lib",
"//test/common/stats:stat_test_utility_lib",
"//test/mocks:common_lib",
"//test/mocks/event:event_mocks",
"//test/mocks/runtime:runtime_mocks",
"//test/mocks/upstream:upstream_mocks",
],
Expand Down Expand Up @@ -62,7 +61,19 @@ envoy_cc_test(
"//source/common/network:address_lib",
"//test/common/stats:stat_test_utility_lib",
"//test/mocks:common_lib",
"//test/mocks/event:event_mocks",
"//test/mocks/runtime:runtime_mocks",
"//test/mocks/upstream:upstream_mocks",
],
)

envoy_cc_test(
name = "peak_ewma_lb_host_lifecycle_test",
srcs = ["peak_ewma_lb_host_lifecycle_test.cc"],
deps = [
"//contrib/peak_ewma/load_balancing_policies/source:peak_ewma_lb_lib",
"//source/common/network:address_lib",
"//test/common/stats:stat_test_utility_lib",
"//test/mocks:common_lib",
"//test/mocks/runtime:runtime_mocks",
"//test/mocks/upstream:upstream_mocks",
],
Expand Down
6 changes: 2 additions & 4 deletions contrib/peak_ewma/load_balancing_policies/test/config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

#include "test/common/stats/stat_test_utility.h"
#include "test/mocks/common.h"
#include "test/mocks/event/mocks.h"
#include "test/mocks/runtime/mocks.h"
#include "test/mocks/server/factory_context.h"
#include "test/mocks/upstream/cluster_info.h"
Expand Down Expand Up @@ -65,7 +64,6 @@ class PeakEwmaConfigTest : public ::testing::Test {
NiceMock<Runtime::MockLoader> runtime_;
NiceMock<Random::MockRandomGenerator> random_;
NiceMock<MockTimeSystem> time_source_;
NiceMock<Event::MockDispatcher> dispatcher_;
MockThreadLocalInstance tls_;
};

Expand Down Expand Up @@ -178,13 +176,13 @@ TEST_F(PeakEwmaConfigTest, ConfigValidation) {
// Very small decay time
proto_config.mutable_decay_time()->set_nanos(1000000); // 1ms

TypedPeakEwmaLbConfig config(proto_config, dispatcher_);
TypedPeakEwmaLbConfig config(proto_config);
EXPECT_EQ(config.lb_config_.decay_time().nanos(), 1000000);

// Very large decay time
proto_config.mutable_decay_time()->set_seconds(300);

TypedPeakEwmaLbConfig config2(proto_config, dispatcher_);
TypedPeakEwmaLbConfig config2(proto_config);
EXPECT_EQ(config2.lb_config_.decay_time().seconds(), 300);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

#include "test/common/stats/stat_test_utility.h"
#include "test/mocks/common.h"
#include "test/mocks/event/mocks.h"
#include "test/mocks/runtime/mocks.h"
#include "test/mocks/stats/mocks.h"
#include "test/mocks/upstream/cluster_info.h"
Expand Down Expand Up @@ -58,8 +57,7 @@ class PeakEwmaLoadBalancerComprehensiveTest : public ::testing::Test {

void createLoadBalancer() {
lb_ = std::make_unique<PeakEwmaLoadBalancer>(priority_set_, nullptr, *stats_, runtime_, random_,
50, *cluster_info_, time_source_, config_,
dispatcher_);
50, *cluster_info_, time_source_, config_);
}

// Note: In a real test we would access host data, but for simplicity
Expand All @@ -76,7 +74,6 @@ class PeakEwmaLoadBalancerComprehensiveTest : public ::testing::Test {
NiceMock<Runtime::MockLoader> runtime_;
NiceMock<Random::MockRandomGenerator> random_;
NiceMock<MockTimeSystem> time_source_;
NiceMock<Event::MockDispatcher> dispatcher_;

std::vector<Upstream::HostSharedPtr> hosts_;
std::unique_ptr<PeakEwmaLoadBalancer> lb_;
Expand Down
Loading
Loading