From 5720a61ea7b46794ec19764887db25fd5ad53b8b Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Tue, 10 Mar 2026 13:58:37 +0100 Subject: [PATCH] feat(server): Add rejection counters and structured logging for all rejection mechanisms MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Before this change, only web and service concurrency rejections emitted a metric counter. Killswitches and throughput rate limits logged at DEBUG. Bandwidth rate limit rejections were completely silent. Every rejection mechanism now emits a `tracing::warn!` and a metric counter: - Killswitches: `server.request.killswitched` counter; warn includes the full killswitch config (usecase, scopes, service pattern) via Debug formatting - Rate limits: `server.request.rate_limited` counter tagged with `reason` (one of bandwidth_global/usecase/scope, throughput_global/usecase/scope/rule); warn includes the reason field - Web concurrency: existing `web.concurrency.rejected` counter + new warn - Service concurrency: existing `service.concurrency.rejected` counter + new warn The observability logic is colocated with the check itself — `Killswitches::matches` and `RateLimiter::check` emit their own counters and logs, keeping call sites clean. A new `Killswitches::find` method returns the matching `&Killswitch` for callers that need to inspect it directly. Co-Authored-By: Claude --- objectstore-server/src/endpoints/batch.rs | 1 - objectstore-server/src/extractors/id.rs | 4 - objectstore-server/src/killswitches.rs | 19 +++- objectstore-server/src/rate_limits.rs | 109 +++++++++++++++++----- objectstore-server/src/web/middleware.rs | 1 + objectstore-service/src/service.rs | 2 + 6 files changed, 105 insertions(+), 31 deletions(-) diff --git a/objectstore-server/src/endpoints/batch.rs b/objectstore-server/src/endpoints/batch.rs index 7b5b7240..01372ed7 100644 --- a/objectstore-server/src/endpoints/batch.rs +++ b/objectstore-server/src/endpoints/batch.rs @@ -82,7 +82,6 @@ async fn batch( if state.rate_limiter.check(&context) { Ok(()) } else { - tracing::debug!("Batch operation rejected due to rate limits"); Err(ApiError::from(BatchError::RateLimited)) } } diff --git a/objectstore-server/src/extractors/id.rs b/objectstore-server/src/extractors/id.rs index a38e3fdb..7bf41a35 100644 --- a/objectstore-server/src/extractors/id.rs +++ b/objectstore-server/src/extractors/id.rs @@ -65,12 +65,10 @@ impl FromRequestParts for Xt { .killswitches .matches(id.context(), service.as_str()) { - tracing::debug!("Request rejected due to killswitches"); return Err(ObjectRejection::Killswitched); } if !state.rate_limiter.check(id.context()) { - tracing::debug!("Request rejected due to rate limits"); return Err(ObjectRejection::RateLimited); } @@ -140,12 +138,10 @@ impl FromRequestParts for Xt { .killswitches .matches(&context, service.as_str()) { - tracing::debug!("Request rejected due to killswitches"); return Err(ObjectRejection::Killswitched); } if !state.rate_limiter.check(&context) { - tracing::debug!("Request rejected due to rate limits"); return Err(ObjectRejection::RateLimited); } diff --git a/objectstore-server/src/killswitches.rs b/objectstore-server/src/killswitches.rs index 4c1561d4..c1514a3a 100644 --- a/objectstore-server/src/killswitches.rs +++ b/objectstore-server/src/killswitches.rs @@ -20,8 +20,25 @@ pub struct Killswitches(pub Vec); impl Killswitches { /// Returns `true` if any of the contained killswitches matches the given context. + /// + /// On match, emits a `server.request.killswitched` metric counter and a `tracing::warn!` log. pub fn matches(&self, context: &ObjectContext, service: Option<&str>) -> bool { - self.0.iter().any(|s| s.matches(context, service)) + let Some(switch) = self.find(context, service) else { + return false; + }; + + objectstore_metrics::counter!("server.request.killswitched": 1); + tracing::warn!(killswitch = ?switch, "Request rejected: killswitch active"); + true + } + + /// Returns the first killswitch that matches the given context, or `None` if none match. + pub fn find<'a>( + &'a self, + context: &ObjectContext, + service: Option<&str>, + ) -> Option<&'a Killswitch> { + self.0.iter().find(|s| s.matches(context, service)) } } diff --git a/objectstore-server/src/rate_limits.rs b/objectstore-server/src/rate_limits.rs index be5294e8..ef48b6a7 100644 --- a/objectstore-server/src/rate_limits.rs +++ b/objectstore-server/src/rate_limits.rs @@ -8,10 +8,10 @@ //! exponentially weighted moving average (EWMA) updated by a background task every //! 50 ms. All rate-limit checks are synchronous and non-blocking. +use std::fmt; use std::pin::Pin; use std::sync::Arc; -use std::sync::Mutex; -use std::sync::atomic::AtomicU64; +use std::sync::{Mutex, atomic::AtomicU64}; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; @@ -22,6 +22,46 @@ use objectstore_service::id::ObjectContext; use objectstore_types::scope::Scopes; use serde::{Deserialize, Serialize}; +/// Identifies which rate limit triggered a rejection. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum RateLimitRejection { + /// Global bandwidth limit exceeded. + BandwidthGlobal, + /// Per-usecase bandwidth limit exceeded. + BandwidthUsecase, + /// Per-scope bandwidth limit exceeded. + BandwidthScope, + /// Global throughput limit exceeded. + ThroughputGlobal, + /// Per-usecase throughput limit exceeded. + ThroughputUsecase, + /// Per-scope throughput limit exceeded. + ThroughputScope, + /// Per-rule throughput limit exceeded. + ThroughputRule, +} + +impl RateLimitRejection { + /// Returns a static string identifier suitable for use as a metric tag. + pub fn as_str(self) -> &'static str { + match self { + RateLimitRejection::BandwidthGlobal => "bandwidth_global", + RateLimitRejection::BandwidthUsecase => "bandwidth_usecase", + RateLimitRejection::BandwidthScope => "bandwidth_scope", + RateLimitRejection::ThroughputGlobal => "throughput_global", + RateLimitRejection::ThroughputUsecase => "throughput_usecase", + RateLimitRejection::ThroughputScope => "throughput_scope", + RateLimitRejection::ThroughputRule => "throughput_rule", + } + } +} + +impl fmt::Display for RateLimitRejection { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(self.as_str()) + } +} + /// Rate limits for objectstore. #[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)] pub struct RateLimits { @@ -175,12 +215,32 @@ impl RateLimiter { /// Checks if the given context is within the rate limits. /// - /// Returns `true` if the context is within the rate limits, `false` otherwise. + /// Returns `true` if the request is admitted, `false` if it was rejected. On rejection, + /// emits a `server.request.rate_limited` metric counter and a `tracing::warn!` log. + /// Bandwidth is checked before throughput so that rejected requests are never counted + /// toward admitted traffic. pub fn check(&self, context: &ObjectContext) -> bool { // Bandwidth is checked first because it is a pure read (no token consumption). // Throughput increments the EWMA accumulator only on success, so checking it // second ensures rejected requests are never counted toward admitted traffic. - self.bandwidth.check(context) && self.throughput.check(context) + let rejection = self + .bandwidth + .check(context) + .or_else(|| self.throughput.check(context)); + + let Some(rejection) = rejection else { + return true; + }; + + objectstore_metrics::counter!( + "server.request.rate_limited": 1, + "reason" => rejection.as_str(), + ); + tracing::warn!( + reason = rejection.as_str(), + "Request rejected: rate limit exceeded" + ); + false } /// Returns all bandwidth accumulators (global + per-usecase + per-scope) for the given context. @@ -366,10 +426,8 @@ impl BandwidthRateLimiter { } } - fn check(&self, context: &ObjectContext) -> bool { - let Some(global_bps) = self.config.global_bps else { - return true; - }; + fn check(&self, context: &ObjectContext) -> Option { + let global_bps = self.config.global_bps?; // Global check if self @@ -378,7 +436,7 @@ impl BandwidthRateLimiter { .load(std::sync::atomic::Ordering::Relaxed) > global_bps { - return false; + return Some(RateLimitRejection::BandwidthGlobal); } // Per-usecase check @@ -390,7 +448,7 @@ impl BandwidthRateLimiter { .load(std::sync::atomic::Ordering::Relaxed) > usecase_bps { - return false; + return Some(RateLimitRejection::BandwidthUsecase); } } @@ -403,11 +461,11 @@ impl BandwidthRateLimiter { .load(std::sync::atomic::Ordering::Relaxed) > scope_bps { - return false; + return Some(RateLimitRejection::BandwidthScope); } } - true + None } /// Returns all accumulators (global + per-usecase + per-scope) for the given context. @@ -509,14 +567,15 @@ impl ThroughputRateLimiter { }); } - fn check(&self, context: &ObjectContext) -> bool { + fn check(&self, context: &ObjectContext) -> Option { // NB: We intentionally use unwrap and crash the server if the mutexes are poisoned. // Global check - if let Some(ref global) = self.global - && !global.lock().unwrap().try_acquire() - { - return false; + if let Some(ref global) = self.global { + let acquired = global.lock().unwrap().try_acquire(); + if !acquired { + return Some(RateLimitRejection::ThroughputGlobal); + } } // Usecase check - only if both global_rps and usecase_pct are set @@ -525,7 +584,7 @@ impl ThroughputRateLimiter { let bucket = guard .get_or_insert_with(context.usecase.clone(), || self.create_bucket(usecase_rps)); if !bucket.lock().unwrap().try_acquire() { - return false; + return Some(RateLimitRejection::ThroughputUsecase); } } @@ -535,7 +594,7 @@ impl ThroughputRateLimiter { let bucket = guard.get_or_insert_with(context.scopes.clone(), || self.create_bucket(scope_rps)); if !bucket.lock().unwrap().try_acquire() { - return false; + return Some(RateLimitRejection::ThroughputScope); } } @@ -550,7 +609,7 @@ impl ThroughputRateLimiter { let guard = self.rules.pin(); let bucket = guard.get_or_insert_with(idx, || self.create_bucket(rule_rps)); if !bucket.lock().unwrap().try_acquire() { - return false; + return Some(RateLimitRejection::ThroughputRule); } } @@ -563,7 +622,7 @@ impl ThroughputRateLimiter { self.total_admitted .fetch_add(1, std::sync::atomic::Ordering::Relaxed); - true + None } fn create_bucket(&self, rps: u32) -> Mutex { @@ -754,8 +813,8 @@ mod tests { ); let context = make_context(); - assert!(limiter.check(&context)); - assert!(limiter.check(&context)); + assert!(limiter.check(&context).is_none()); + assert!(limiter.check(&context).is_none()); assert_eq!( limiter @@ -776,8 +835,8 @@ mod tests { let context = make_context(); // First call admitted (consumes the one token), second rejected. - assert!(limiter.check(&context)); - assert!(!limiter.check(&context)); + assert!(limiter.check(&context).is_none()); + assert!(limiter.check(&context).is_some()); assert_eq!( limiter diff --git a/objectstore-server/src/web/middleware.rs b/objectstore-server/src/web/middleware.rs index 948bae69..0d62ed40 100644 --- a/objectstore-server/src/web/middleware.rs +++ b/objectstore-server/src/web/middleware.rs @@ -35,6 +35,7 @@ pub async fn limit_web_concurrency( "web.concurrency.rejected": 1, "service" => service.as_str().unwrap_or("unknown"), ); + tracing::warn!("Request rejected: web concurrency limit reached"); return StatusCode::SERVICE_UNAVAILABLE.into_response(); } diff --git a/objectstore-service/src/service.rs b/objectstore-service/src/service.rs index e0bb94d2..e9d7def7 100644 --- a/objectstore-service/src/service.rs +++ b/objectstore-service/src/service.rs @@ -205,6 +205,7 @@ impl StorageService { }; let reservation = acquire_result.inspect_err(|_| { objectstore_metrics::counter!("service.concurrency.rejected": 1); + tracing::warn!("Request rejected: service at capacity"); })?; Ok(StreamExecutor { @@ -249,6 +250,7 @@ impl StorageService { { let permit = self.concurrency.try_acquire().inspect_err(|_| { objectstore_metrics::counter!("service.concurrency.rejected": 1); + tracing::warn!("Request rejected: service at capacity"); })?; crate::concurrency::spawn_metered(operation, permit, f).await