Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions objectstore-server/src/endpoints/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use thiserror::Error;

use crate::auth::AuthError;
use crate::extractors::batch::BatchError;
use crate::rejection::RejectionReason;

/// Error type for API operations.
#[derive(Debug, Error)]
Expand Down Expand Up @@ -100,8 +101,47 @@ impl ApiError {
}
}

impl ApiError {
/// Returns the [`RejectionReason`] for this error, used to emit rejection metrics.
pub fn rejection_reason(&self) -> RejectionReason {
match self {
ApiError::Client(_) => RejectionReason::BadRequest,

ApiError::Auth(AuthError::BadRequest(_)) => RejectionReason::BadRequest,
ApiError::Auth(
AuthError::ValidationFailure(_)
| AuthError::VerificationFailure
| AuthError::NotPermitted,
) => RejectionReason::Auth,
ApiError::Auth(AuthError::InternalError(_)) => RejectionReason::Internal,

ApiError::Batch(
BatchError::BadRequest(_)
| BatchError::Multipart(_)
| BatchError::Metadata(_)
| BatchError::LimitExceeded(_),
) => RejectionReason::BadRequest,
ApiError::Batch(BatchError::RateLimited) => RejectionReason::RateLimit,
ApiError::Batch(BatchError::ResponseSerialization { .. }) => RejectionReason::Internal,

ApiError::Service(ServiceError::Metadata(_)) => RejectionReason::BadRequest,
ApiError::Service(ServiceError::AtCapacity) => RejectionReason::TaskConcurrency,
ApiError::Service(
ServiceError::Io(_)
| ServiceError::Serde { .. }
| ServiceError::Reqwest { .. }
| ServiceError::GcpAuth(_)
| ServiceError::Panic(_)
| ServiceError::Dropped
| ServiceError::Generic { .. },
) => RejectionReason::Internal,
}
}
}

impl IntoResponse for ApiError {
fn into_response(self) -> Response {
self.rejection_reason().emit();
let body = ApiErrorResponse::from_error(&self);
(self.status(), Json(body)).into_response()
}
Expand Down
24 changes: 23 additions & 1 deletion objectstore-server/src/extractors/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ use axum::extract::{
FromRequest, Multipart, Request,
multipart::{Field, MultipartError, MultipartRejection},
};
use axum::response::{IntoResponse, Response};
use futures::{StreamExt, stream::BoxStream};
use objectstore_service::streaming::{Delete, Get, Insert, Operation};
use objectstore_types::metadata::Metadata;
use thiserror::Error;

use crate::batch::{HEADER_BATCH_OPERATION_KEY, HEADER_BATCH_OPERATION_KIND};
use crate::rejection::RejectionReason;

/// Errors that can occur when processing or executing batch operations.
#[derive(Debug, Error)]
Expand Down Expand Up @@ -124,6 +126,26 @@ async fn try_operation_from_field(field: Field<'_>) -> Result<Operation, BatchEr
Ok(operation)
}

/// Rejection type for [`BatchOperationStream`].
///
/// Wraps [`MultipartRejection`] so that invalid multipart requests (e.g. missing
/// `Content-Type` boundary) emit a `server.rejected` metric before responding.
#[derive(Debug)]
pub struct BatchStreamRejection(MultipartRejection);

impl From<MultipartRejection> for BatchStreamRejection {
fn from(r: MultipartRejection) -> Self {
Self(r)
}
}

impl IntoResponse for BatchStreamRejection {
fn into_response(self) -> Response {
RejectionReason::BadRequest.emit();
self.0.into_response()
}
}

/// A lazily-parsed stream of batch operations extracted from a multipart request body.
pub struct BatchOperationStream(pub BoxStream<'static, Result<Operation, BatchError>>);

Expand All @@ -140,7 +162,7 @@ impl<S> FromRequest<S> for BatchOperationStream
where
S: Send + Sync,
{
type Rejection = MultipartRejection;
type Rejection = BatchStreamRejection;

async fn from_request(request: Request, state: &S) -> Result<Self, Self::Rejection> {
let mut multipart = Multipart::from_request(request, state).await?;
Expand Down
13 changes: 13 additions & 0 deletions objectstore-server/src/extractors/id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use serde::{Deserialize, de};

use crate::extractors::Xt;
use crate::extractors::downstream_service::DownstreamService;
use crate::rejection::RejectionReason;
use crate::state::ServiceState;

#[derive(Debug)]
Expand All @@ -19,8 +20,20 @@ pub enum ObjectRejection {
RateLimited,
}

impl ObjectRejection {
/// Returns the [`RejectionReason`] for this rejection, used to emit rejection metrics.
pub fn rejection_reason(&self) -> RejectionReason {
match self {
ObjectRejection::Path(_) => RejectionReason::BadRequest,
ObjectRejection::Killswitched => RejectionReason::Killswitch,
ObjectRejection::RateLimited => RejectionReason::RateLimit,
}
}
}

impl IntoResponse for ObjectRejection {
fn into_response(self) -> Response {
self.rejection_reason().emit();
match self {
ObjectRejection::Path(rejection) => rejection.into_response(),
ObjectRejection::Killswitched => (
Expand Down
1 change: 1 addition & 0 deletions objectstore-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ pub mod killswitches;
pub mod multipart;
pub mod observability;
pub mod rate_limits;
pub mod rejection;
pub mod state;
pub mod web;
46 changes: 46 additions & 0 deletions objectstore-server/src/rejection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
//! Rejection reason classification and metrics emission.
//!
//! [`RejectionReason`] classifies why a request was rejected, providing a single
//! point for emitting the `server.requests.rejected` counter metric with a `reason` tag.

/// Classifies the reason a request was rejected by the server.
///
/// Used to emit a unified `server.requests.rejected` counter tagged with `reason`,
/// providing visibility into each rejection category.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RejectionReason {
/// Request was blocked by a configured killswitch.
Killswitch,
/// Request exceeded a throughput or bandwidth rate limit.
RateLimit,
/// Request failed authentication or authorization checks.
Auth,
/// An internal server error prevented the request from being handled.
Internal,
/// Request was malformed or violated API constraints.
BadRequest,
/// The backend task concurrency limit was reached.
TaskConcurrency,
/// The web server in-flight request concurrency limit was reached.
WebConcurrency,
}

impl RejectionReason {
/// Returns the string used as the `reason` metric tag value.
pub fn as_str(&self) -> &'static str {
match self {
RejectionReason::Killswitch => "killswitch",
RejectionReason::RateLimit => "rate_limit",
RejectionReason::Auth => "auth",
RejectionReason::Internal => "internal",
RejectionReason::BadRequest => "bad_request",
RejectionReason::TaskConcurrency => "task_concurrency",
RejectionReason::WebConcurrency => "web_concurrency",
}
}

/// Emits a `server.requests.rejected` counter tagged with this rejection reason.
pub fn emit(&self) {
objectstore_metrics::counter!("server.requests.rejected": 1, "reason" => self.as_str());
}
}
3 changes: 2 additions & 1 deletion objectstore-server/src/web/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tokio::time::Instant;
use tower_http::set_header::SetResponseHeaderLayer;

use crate::endpoints::is_internal_route;
use crate::rejection::RejectionReason;
use crate::web::RequestCounter;

/// The value for the `Server` HTTP header.
Expand All @@ -29,7 +30,7 @@ pub async fn limit_web_concurrency(
let route = matched_path.as_ref().map_or("unknown", |m| m.as_str());

if !is_internal_route(route) && counter.count() >= counter.limit() {
objectstore_metrics::counter!("web.concurrency.rejected": 1);
RejectionReason::WebConcurrency.emit();
return StatusCode::SERVICE_UNAVAILABLE.into_response();
}

Expand Down