Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion objectstore-server/src/endpoints/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ async fn batch(
if state.rate_limiter.check(&context) {
Ok(())
} else {
tracing::debug!("Batch operation rejected due to rate limits");
Err(ApiError::from(BatchError::RateLimited))
}
}
Expand Down
4 changes: 0 additions & 4 deletions objectstore-server/src/extractors/id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,10 @@ impl FromRequestParts<ServiceState> for Xt<ObjectId> {
.killswitches
.matches(id.context(), service.as_str())
{
tracing::debug!("Request rejected due to killswitches");
return Err(ObjectRejection::Killswitched);
}

if !state.rate_limiter.check(id.context()) {
tracing::debug!("Request rejected due to rate limits");
return Err(ObjectRejection::RateLimited);
}

Expand Down Expand Up @@ -140,12 +138,10 @@ impl FromRequestParts<ServiceState> for Xt<ObjectContext> {
.killswitches
.matches(&context, service.as_str())
{
tracing::debug!("Request rejected due to killswitches");
return Err(ObjectRejection::Killswitched);
}

if !state.rate_limiter.check(&context) {
tracing::debug!("Request rejected due to rate limits");
return Err(ObjectRejection::RateLimited);
}

Expand Down
19 changes: 18 additions & 1 deletion objectstore-server/src/killswitches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,25 @@ pub struct Killswitches(pub Vec<Killswitch>);

impl Killswitches {
/// Returns `true` if any of the contained killswitches matches the given context.
///
/// On match, emits a `server.request.killswitched` metric counter and a `tracing::warn!` log.
pub fn matches(&self, context: &ObjectContext, service: Option<&str>) -> bool {
self.0.iter().any(|s| s.matches(context, service))
let Some(switch) = self.find(context, service) else {
return false;
};

objectstore_metrics::counter!("server.request.killswitched": 1);
tracing::warn!(killswitch = ?switch, "Request rejected: killswitch active");
true
}

/// Returns the first killswitch that matches the given context, or `None` if none match.
pub fn find<'a>(
&'a self,
context: &ObjectContext,
service: Option<&str>,
) -> Option<&'a Killswitch> {
self.0.iter().find(|s| s.matches(context, service))
}
}

Expand Down
109 changes: 84 additions & 25 deletions objectstore-server/src/rate_limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
//! exponentially weighted moving average (EWMA) updated by a background task every
//! 50 ms. All rate-limit checks are synchronous and non-blocking.

use std::fmt;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::AtomicU64;
use std::sync::{Mutex, atomic::AtomicU64};
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

Expand All @@ -22,6 +22,46 @@ use objectstore_service::id::ObjectContext;
use objectstore_types::scope::Scopes;
use serde::{Deserialize, Serialize};

/// Identifies which rate limit triggered a rejection.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum RateLimitRejection {
/// Global bandwidth limit exceeded.
BandwidthGlobal,
/// Per-usecase bandwidth limit exceeded.
BandwidthUsecase,
/// Per-scope bandwidth limit exceeded.
BandwidthScope,
/// Global throughput limit exceeded.
ThroughputGlobal,
/// Per-usecase throughput limit exceeded.
ThroughputUsecase,
/// Per-scope throughput limit exceeded.
ThroughputScope,
/// Per-rule throughput limit exceeded.
ThroughputRule,
}

impl RateLimitRejection {
/// Returns a static string identifier suitable for use as a metric tag.
pub fn as_str(self) -> &'static str {
match self {
RateLimitRejection::BandwidthGlobal => "bandwidth_global",
RateLimitRejection::BandwidthUsecase => "bandwidth_usecase",
RateLimitRejection::BandwidthScope => "bandwidth_scope",
RateLimitRejection::ThroughputGlobal => "throughput_global",
RateLimitRejection::ThroughputUsecase => "throughput_usecase",
RateLimitRejection::ThroughputScope => "throughput_scope",
RateLimitRejection::ThroughputRule => "throughput_rule",
}
}
}

impl fmt::Display for RateLimitRejection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(self.as_str())
}
}

/// Rate limits for objectstore.
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
pub struct RateLimits {
Expand Down Expand Up @@ -175,12 +215,32 @@ impl RateLimiter {

/// Checks if the given context is within the rate limits.
///
/// Returns `true` if the context is within the rate limits, `false` otherwise.
/// Returns `true` if the request is admitted, `false` if it was rejected. On rejection,
/// emits a `server.request.rate_limited` metric counter and a `tracing::warn!` log.
/// Bandwidth is checked before throughput so that rejected requests are never counted
/// toward admitted traffic.
pub fn check(&self, context: &ObjectContext) -> bool {
// Bandwidth is checked first because it is a pure read (no token consumption).
// Throughput increments the EWMA accumulator only on success, so checking it
// second ensures rejected requests are never counted toward admitted traffic.
self.bandwidth.check(context) && self.throughput.check(context)
let rejection = self
.bandwidth
.check(context)
.or_else(|| self.throughput.check(context));

let Some(rejection) = rejection else {
return true;
};

objectstore_metrics::counter!(
"server.request.rate_limited": 1,
"reason" => rejection.as_str(),
);
tracing::warn!(
reason = rejection.as_str(),
"Request rejected: rate limit exceeded"
);
false
}

/// Returns all bandwidth accumulators (global + per-usecase + per-scope) for the given context.
Expand Down Expand Up @@ -366,10 +426,8 @@ impl BandwidthRateLimiter {
}
}

fn check(&self, context: &ObjectContext) -> bool {
let Some(global_bps) = self.config.global_bps else {
return true;
};
fn check(&self, context: &ObjectContext) -> Option<RateLimitRejection> {
let global_bps = self.config.global_bps?;

// Global check
if self
Expand All @@ -378,7 +436,7 @@ impl BandwidthRateLimiter {
.load(std::sync::atomic::Ordering::Relaxed)
> global_bps
{
return false;
return Some(RateLimitRejection::BandwidthGlobal);
}

// Per-usecase check
Expand All @@ -390,7 +448,7 @@ impl BandwidthRateLimiter {
.load(std::sync::atomic::Ordering::Relaxed)
> usecase_bps
{
return false;
return Some(RateLimitRejection::BandwidthUsecase);
}
}

Expand All @@ -403,11 +461,11 @@ impl BandwidthRateLimiter {
.load(std::sync::atomic::Ordering::Relaxed)
> scope_bps
{
return false;
return Some(RateLimitRejection::BandwidthScope);
}
}

true
None
}

/// Returns all accumulators (global + per-usecase + per-scope) for the given context.
Expand Down Expand Up @@ -509,14 +567,15 @@ impl ThroughputRateLimiter {
});
}

fn check(&self, context: &ObjectContext) -> bool {
fn check(&self, context: &ObjectContext) -> Option<RateLimitRejection> {
// NB: We intentionally use unwrap and crash the server if the mutexes are poisoned.

// Global check
if let Some(ref global) = self.global
&& !global.lock().unwrap().try_acquire()
{
return false;
if let Some(ref global) = self.global {
let acquired = global.lock().unwrap().try_acquire();
if !acquired {
return Some(RateLimitRejection::ThroughputGlobal);
}
}

// Usecase check - only if both global_rps and usecase_pct are set
Expand All @@ -525,7 +584,7 @@ impl ThroughputRateLimiter {
let bucket = guard
.get_or_insert_with(context.usecase.clone(), || self.create_bucket(usecase_rps));
if !bucket.lock().unwrap().try_acquire() {
return false;
return Some(RateLimitRejection::ThroughputUsecase);
}
}

Expand All @@ -535,7 +594,7 @@ impl ThroughputRateLimiter {
let bucket =
guard.get_or_insert_with(context.scopes.clone(), || self.create_bucket(scope_rps));
if !bucket.lock().unwrap().try_acquire() {
return false;
return Some(RateLimitRejection::ThroughputScope);
}
}

Expand All @@ -550,7 +609,7 @@ impl ThroughputRateLimiter {
let guard = self.rules.pin();
let bucket = guard.get_or_insert_with(idx, || self.create_bucket(rule_rps));
if !bucket.lock().unwrap().try_acquire() {
return false;
return Some(RateLimitRejection::ThroughputRule);
}
}

Expand All @@ -563,7 +622,7 @@ impl ThroughputRateLimiter {
self.total_admitted
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);

true
None
}

fn create_bucket(&self, rps: u32) -> Mutex<TokenBucket> {
Expand Down Expand Up @@ -754,8 +813,8 @@ mod tests {
);

let context = make_context();
assert!(limiter.check(&context));
assert!(limiter.check(&context));
assert!(limiter.check(&context).is_none());
assert!(limiter.check(&context).is_none());

assert_eq!(
limiter
Expand All @@ -776,8 +835,8 @@ mod tests {

let context = make_context();
// First call admitted (consumes the one token), second rejected.
assert!(limiter.check(&context));
assert!(!limiter.check(&context));
assert!(limiter.check(&context).is_none());
assert!(limiter.check(&context).is_some());

assert_eq!(
limiter
Expand Down
1 change: 1 addition & 0 deletions objectstore-server/src/web/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub async fn limit_web_concurrency(
"web.concurrency.rejected": 1,
"service" => service.as_str().unwrap_or("unknown"),
);
tracing::warn!("Request rejected: web concurrency limit reached");
return StatusCode::SERVICE_UNAVAILABLE.into_response();
}

Expand Down
2 changes: 2 additions & 0 deletions objectstore-service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ impl StorageService {
};
let reservation = acquire_result.inspect_err(|_| {
objectstore_metrics::counter!("service.concurrency.rejected": 1);
tracing::warn!("Request rejected: service at capacity");
})?;

Ok(StreamExecutor {
Expand Down Expand Up @@ -249,6 +250,7 @@ impl StorageService {
{
let permit = self.concurrency.try_acquire().inspect_err(|_| {
objectstore_metrics::counter!("service.concurrency.rejected": 1);
tracing::warn!("Request rejected: service at capacity");
})?;

crate::concurrency::spawn_metered(operation, permit, f).await
Expand Down