diff --git a/objectstore-server/src/endpoints/common.rs b/objectstore-server/src/endpoints/common.rs index dd2bd4e7..26d9364c 100644 --- a/objectstore-server/src/endpoints/common.rs +++ b/objectstore-server/src/endpoints/common.rs @@ -11,6 +11,7 @@ use thiserror::Error; use crate::auth::AuthError; use crate::extractors::batch::BatchError; +use crate::rejection::RejectionReason; /// Error type for API operations. #[derive(Debug, Error)] @@ -100,8 +101,47 @@ impl ApiError { } } +impl ApiError { + /// Returns the [`RejectionReason`] for this error, used to emit rejection metrics. + pub fn rejection_reason(&self) -> RejectionReason { + match self { + ApiError::Client(_) => RejectionReason::BadRequest, + + ApiError::Auth(AuthError::BadRequest(_)) => RejectionReason::BadRequest, + ApiError::Auth( + AuthError::ValidationFailure(_) + | AuthError::VerificationFailure + | AuthError::NotPermitted, + ) => RejectionReason::Auth, + ApiError::Auth(AuthError::InternalError(_)) => RejectionReason::Internal, + + ApiError::Batch( + BatchError::BadRequest(_) + | BatchError::Multipart(_) + | BatchError::Metadata(_) + | BatchError::LimitExceeded(_), + ) => RejectionReason::BadRequest, + ApiError::Batch(BatchError::RateLimited) => RejectionReason::RateLimit, + ApiError::Batch(BatchError::ResponseSerialization { .. }) => RejectionReason::Internal, + + ApiError::Service(ServiceError::Metadata(_)) => RejectionReason::BadRequest, + ApiError::Service(ServiceError::AtCapacity) => RejectionReason::TaskConcurrency, + ApiError::Service( + ServiceError::Io(_) + | ServiceError::Serde { .. } + | ServiceError::Reqwest { .. } + | ServiceError::GcpAuth(_) + | ServiceError::Panic(_) + | ServiceError::Dropped + | ServiceError::Generic { .. }, + ) => RejectionReason::Internal, + } + } +} + impl IntoResponse for ApiError { fn into_response(self) -> Response { + self.rejection_reason().emit(); let body = ApiErrorResponse::from_error(&self); (self.status(), Json(body)).into_response() } diff --git a/objectstore-server/src/extractors/batch.rs b/objectstore-server/src/extractors/batch.rs index 4f84f677..9e1d3bfd 100644 --- a/objectstore-server/src/extractors/batch.rs +++ b/objectstore-server/src/extractors/batch.rs @@ -9,12 +9,14 @@ use axum::extract::{ FromRequest, Multipart, Request, multipart::{Field, MultipartError, MultipartRejection}, }; +use axum::response::{IntoResponse, Response}; use futures::{StreamExt, stream::BoxStream}; use objectstore_service::streaming::{Delete, Get, Insert, Operation}; use objectstore_types::metadata::Metadata; use thiserror::Error; use crate::batch::{HEADER_BATCH_OPERATION_KEY, HEADER_BATCH_OPERATION_KIND}; +use crate::rejection::RejectionReason; /// Errors that can occur when processing or executing batch operations. #[derive(Debug, Error)] @@ -124,6 +126,26 @@ async fn try_operation_from_field(field: Field<'_>) -> Result for BatchStreamRejection { + fn from(r: MultipartRejection) -> Self { + Self(r) + } +} + +impl IntoResponse for BatchStreamRejection { + fn into_response(self) -> Response { + RejectionReason::BadRequest.emit(); + self.0.into_response() + } +} + /// A lazily-parsed stream of batch operations extracted from a multipart request body. pub struct BatchOperationStream(pub BoxStream<'static, Result>); @@ -140,7 +162,7 @@ impl FromRequest for BatchOperationStream where S: Send + Sync, { - type Rejection = MultipartRejection; + type Rejection = BatchStreamRejection; async fn from_request(request: Request, state: &S) -> Result { let mut multipart = Multipart::from_request(request, state).await?; diff --git a/objectstore-server/src/extractors/id.rs b/objectstore-server/src/extractors/id.rs index a38e3fdb..672f2e9d 100644 --- a/objectstore-server/src/extractors/id.rs +++ b/objectstore-server/src/extractors/id.rs @@ -10,6 +10,7 @@ use serde::{Deserialize, de}; use crate::extractors::Xt; use crate::extractors::downstream_service::DownstreamService; +use crate::rejection::RejectionReason; use crate::state::ServiceState; #[derive(Debug)] @@ -19,8 +20,20 @@ pub enum ObjectRejection { RateLimited, } +impl ObjectRejection { + /// Returns the [`RejectionReason`] for this rejection, used to emit rejection metrics. + pub fn rejection_reason(&self) -> RejectionReason { + match self { + ObjectRejection::Path(_) => RejectionReason::BadRequest, + ObjectRejection::Killswitched => RejectionReason::Killswitch, + ObjectRejection::RateLimited => RejectionReason::RateLimit, + } + } +} + impl IntoResponse for ObjectRejection { fn into_response(self) -> Response { + self.rejection_reason().emit(); match self { ObjectRejection::Path(rejection) => rejection.into_response(), ObjectRejection::Killswitched => ( diff --git a/objectstore-server/src/lib.rs b/objectstore-server/src/lib.rs index 50328baf..cf0bb745 100644 --- a/objectstore-server/src/lib.rs +++ b/objectstore-server/src/lib.rs @@ -13,5 +13,6 @@ pub mod killswitches; pub mod multipart; pub mod observability; pub mod rate_limits; +pub mod rejection; pub mod state; pub mod web; diff --git a/objectstore-server/src/rejection.rs b/objectstore-server/src/rejection.rs new file mode 100644 index 00000000..fbc3a145 --- /dev/null +++ b/objectstore-server/src/rejection.rs @@ -0,0 +1,46 @@ +//! Rejection reason classification and metrics emission. +//! +//! [`RejectionReason`] classifies why a request was rejected, providing a single +//! point for emitting the `server.requests.rejected` counter metric with a `reason` tag. + +/// Classifies the reason a request was rejected by the server. +/// +/// Used to emit a unified `server.requests.rejected` counter tagged with `reason`, +/// providing visibility into each rejection category. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum RejectionReason { + /// Request was blocked by a configured killswitch. + Killswitch, + /// Request exceeded a throughput or bandwidth rate limit. + RateLimit, + /// Request failed authentication or authorization checks. + Auth, + /// An internal server error prevented the request from being handled. + Internal, + /// Request was malformed or violated API constraints. + BadRequest, + /// The backend task concurrency limit was reached. + TaskConcurrency, + /// The web server in-flight request concurrency limit was reached. + WebConcurrency, +} + +impl RejectionReason { + /// Returns the string used as the `reason` metric tag value. + pub fn as_str(&self) -> &'static str { + match self { + RejectionReason::Killswitch => "killswitch", + RejectionReason::RateLimit => "rate_limit", + RejectionReason::Auth => "auth", + RejectionReason::Internal => "internal", + RejectionReason::BadRequest => "bad_request", + RejectionReason::TaskConcurrency => "task_concurrency", + RejectionReason::WebConcurrency => "web_concurrency", + } + } + + /// Emits a `server.requests.rejected` counter tagged with this rejection reason. + pub fn emit(&self) { + objectstore_metrics::counter!("server.requests.rejected": 1, "reason" => self.as_str()); + } +} diff --git a/objectstore-server/src/web/middleware.rs b/objectstore-server/src/web/middleware.rs index 2a4756eb..f1b5c545 100644 --- a/objectstore-server/src/web/middleware.rs +++ b/objectstore-server/src/web/middleware.rs @@ -10,6 +10,7 @@ use tokio::time::Instant; use tower_http::set_header::SetResponseHeaderLayer; use crate::endpoints::is_internal_route; +use crate::rejection::RejectionReason; use crate::web::RequestCounter; /// The value for the `Server` HTTP header. @@ -29,7 +30,7 @@ pub async fn limit_web_concurrency( let route = matched_path.as_ref().map_or("unknown", |m| m.as_str()); if !is_internal_route(route) && counter.count() >= counter.limit() { - objectstore_metrics::counter!("web.concurrency.rejected": 1); + RejectionReason::WebConcurrency.emit(); return StatusCode::SERVICE_UNAVAILABLE.into_response(); }