Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
8d61d82
initial logic
tobias-wilfert Mar 6, 2026
3080083
Merge branch 'master' into tobias-wilfert/feat/migrate-standalone-att…
tobias-wilfert Mar 6, 2026
9d30699
split logic into different files & send attachment to object store
tobias-wilfert Mar 9, 2026
e31ba26
Merge branch 'tobias-wilfert/feat/migrate-standalone-attachments' of …
tobias-wilfert Mar 9, 2026
f2af288
split strange test into two
tobias-wilfert Mar 9, 2026
4990f24
Merge branch 'master' into tobias-wilfert/feat/migrate-standalone-att…
tobias-wilfert Mar 9, 2026
3e99a9e
remove old code
tobias-wilfert Mar 9, 2026
2c0674b
add changelog entry
tobias-wilfert Mar 9, 2026
c3a1bc5
Merge branch 'tobias-wilfert/feat/migrate-standalone-attachments' of …
tobias-wilfert Mar 9, 2026
b22e064
small fixes
tobias-wilfert Mar 9, 2026
51816ad
match on the item type rather than attachment type
tobias-wilfert Mar 10, 2026
3e542be
Update relay-server/src/processing/attachments/forward.rs
tobias-wilfert Mar 10, 2026
60269da
Merge branch 'master' into tobias-wilfert/feat/migrate-standalone-att…
tobias-wilfert Mar 10, 2026
7600e86
change if to debug assert
tobias-wilfert Mar 10, 2026
3c7ccca
add lenient for `ViewHierarchyScrubbing`
tobias-wilfert Mar 10, 2026
3e64ddd
Update relay-server/src/services/objectstore.rs
tobias-wilfert Mar 11, 2026
1587af6
Update relay-server/src/services/store.rs
tobias-wilfert Mar 11, 2026
f4f644d
Merge branch 'master' into tobias-wilfert/feat/migrate-standalone-att…
tobias-wilfert Mar 11, 2026
bf956af
use `let else` instead of `ok_or_else`
tobias-wilfert Mar 11, 2026
a660b09
move if check to being of the function
tobias-wilfert Mar 11, 2026
89ac3c9
rename `handle_attachment` function
tobias-wilfert Mar 11, 2026
4bf8560
simplify debug assert and add clarifying comment
tobias-wilfert Mar 11, 2026
53da87b
Merge branch 'master' into tobias-wilfert/feat/migrate-standalone-att…
tobias-wilfert Mar 11, 2026
bcdd552
remove unused import
tobias-wilfert Mar 11, 2026
5d4ea17
merge tests
tobias-wilfert Mar 11, 2026
6eb71e4
Merge branch 'master' into tobias-wilfert/feat/migrate-standalone-att…
tobias-wilfert Mar 11, 2026
2c35b8e
add metric to verify stuff ends up in the new processor
tobias-wilfert Mar 12, 2026
87e70d0
Merge branch 'tobias-wilfert/feat/migrate-standalone-attachments' of …
tobias-wilfert Mar 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
- Strip performance metric specs from extraction while keeping extraction interfaces intact. ([#5674](https://github.com/getsentry/relay/pull/5674))
- Allow deferred lengths to the `/upload` endpoint when the sender is trusted. ([#5658](https://github.com/getsentry/relay/pull/5658))
- Use new processor architecture to process client reports. ([#5686](https://github.com/getsentry/relay/pull/5686))
- Use new processor architecture to process standalone attachments. ([#5703](https://github.com/getsentry/relay/pull/5703))
- Prevent timeouts on the `/upload` endpoint. ([#5692](https://github.com/getsentry/relay/pull/5692))
- Handle traffic bursts in the objectstore service. ([#5689](https://github.com/getsentry/relay/pull/5689))
- Disable `fetch_materials` on GoCD `pipeline-complete` stages. ([#5697](https://github.com/getsentry/relay/pull/5697))
Expand Down
55 changes: 55 additions & 0 deletions relay-server/src/processing/attachments/forward.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use crate::Envelope;
use crate::managed::{Managed, Rejected};
use crate::processing::attachments::AttachmentsOutput;
use crate::processing::{self, Forward};

impl Forward for AttachmentsOutput {
fn serialize_envelope(
self,
_: processing::ForwardContext<'_>,
) -> Result<Managed<Box<Envelope>>, Rejected<()>> {
let Self(attachments) = self;
Ok(attachments.map(|attachments, _| {
Envelope::from_parts(attachments.headers, attachments.attachments)
}))
}

#[cfg(feature = "processing")]
fn forward_store(
self,
s: processing::StoreHandle<'_>,
ctx: processing::ForwardContext<'_>,
) -> Result<(), Rejected<()>> {
use crate::processing::attachments::Error;

let Self(attachments) = self;

let Some(event_id) = attachments.headers.event_id() else {
return Err(attachments.reject_err(Error::NoEventId).map(drop));
};

let use_objectstore = {
let options = &ctx.global_config.options;
crate::utils::sample(options.objectstore_attachments_sample_rate).is_keep()
};

for attachment in attachments.split(|attachment| attachment.attachments) {
let store_attachment = attachment.map(|attachment, _| {
use crate::services::store::StoreAttachment;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Move imports to top of file (feature gated).

let quantities = attachment.quantities();
StoreAttachment {
event_id,
attachment,
quantities,
}
});
if use_objectstore {
s.send_to_objectstore(store_attachment);
} else {
s.send_to_store(store_attachment);
}
}

Ok(())
}
}
138 changes: 138 additions & 0 deletions relay-server/src/processing/attachments/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
use std::sync::Arc;

use relay_quotas::RateLimits;

use crate::envelope::{EnvelopeHeaders, Item, ItemType, Items};
use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities, Rejected};
use crate::processing::{self, CountRateLimited, Output, QuotaRateLimiter};
#[cfg(feature = "processing")]
use crate::services::outcome::DiscardReason;
use crate::services::outcome::Outcome;
use crate::statsd::RelayCounters;

mod forward;
mod process;

#[derive(Debug, thiserror::Error)]
pub enum Error {
/// The Attachment was rate limited.
#[error("rate limited")]
RateLimited(RateLimits),

/// The envelope did not contain an event ID.
#[cfg(feature = "processing")]
#[error("missing event ID")]
NoEventId,
}

impl OutcomeError for Error {
type Error = Self;

fn consume(self) -> (Option<crate::services::outcome::Outcome>, Self::Error) {
let outcome = match &self {
Self::RateLimited(limits) => {
let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
Some(Outcome::RateLimited(reason_code))
}
#[cfg(feature = "processing")]
Self::NoEventId => Some(Outcome::Invalid(DiscardReason::Internal)),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume Relay somehow already ensures there is always an event id?

What happens if the there is a standalone attachment sent in an envelope without event id? Does it make sense to ingest it? And does that actually mean it's Internal?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old logic checks it in the store here, so this check "just moved into the processor".

event_id.ok_or(StoreError::NoEventId)?,

The outcome is the same as that of the store error (but we can ofcourse improve it):

pub enum StoreError {
#[error("failed to send the message to kafka: {0}")]
SendFailed(#[from] ClientError),
#[error("failed to encode data: {0}")]
EncodingFailed(std::io::Error),
#[error("failed to store event because event id was missing")]
NoEventId,
}
impl OutcomeError for StoreError {
type Error = Self;
fn consume(self) -> (Option<Outcome>, Self::Error) {
(Some(Outcome::Invalid(DiscardReason::Internal)), self)
}
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Figure we could/should do an explicit validation in the processing code and then reject as invalid but not internal, if it can be caused by users sending broken data.

};
(outcome, self)
}
}

impl From<RateLimits> for Error {
fn from(value: RateLimits) -> Self {
Self::RateLimited(value)
}
}
Comment on lines +44 to +48
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does RateLimited(#[from] RateLimits) not work?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gets this funky error:

error[E0599]: the method `as_dyn_error` exists for reference `&relay_quotas::RateLimits`, but its trait bounds were not satisfied
   --> relay-server/src/processing/attachments/mod.rs:19:19
    |
 19 |     RateLimited(#[from] RateLimits),
    |                   ^^^^ method cannot be called on `&relay_quotas::RateLimits` due to unsatisfied trait bounds
    |
   ::: relay-quotas/src/rate_limit.rs:250:1
    |
250 | pub struct RateLimits {
    | --------------------- doesn't satisfy `relay_quotas::RateLimits: AsDynError<'_>` or `relay_quotas::RateLimits: StdError`
    |
    = note: the following trait bounds were not satisfied:
            `relay_quotas::RateLimits: StdError`
            which is required by `relay_quotas::RateLimits: AsDynError<'_>`
            `&relay_quotas::RateLimits: StdError`
            which is required by `&relay_quotas::RateLimits: AsDynError<'_>`


/// A processor for Attachments.
pub struct AttachmentProcessor {
limiter: Arc<QuotaRateLimiter>,
}

impl AttachmentProcessor {
/// Creates a new [`Self`].
pub fn new(limiter: Arc<QuotaRateLimiter>) -> Self {
Self { limiter }
}
}

impl processing::Processor for AttachmentProcessor {
type UnitOfWork = SerializedAttachments;
type Output = AttachmentsOutput;
type Error = Error;

fn prepare_envelope(
&self,
envelope: &mut ManagedEnvelope,
) -> Option<Managed<Self::UnitOfWork>> {
debug_assert!(
!envelope.envelope().items().any(Item::creates_event),
"AttachmentProcessor should not receive items that create events"
);

let attachments = envelope
.envelope_mut()
.take_items_by(|i| i.requires_event() && matches!(i.ty(), ItemType::Attachment));

if attachments.is_empty() {
return None;
}

let headers = envelope.envelope().headers().clone();
let work = SerializedAttachments {
headers,
attachments,
};
Some(Managed::with_meta_from(envelope, work))
}

async fn process(
&self,
attachments: Managed<Self::UnitOfWork>,
ctx: processing::Context<'_>,
) -> Result<processing::Output<Self::Output>, Rejected<Self::Error>> {
for item in &attachments.attachments {
let attachment_type_tag = match item.attachment_type() {
Some(t) => &t.to_string(),
None => "",
};
relay_statsd::metric!(
counter(RelayCounters::StandaloneItem) += 1,
processor = "new",
item_type = item.ty().name(),
attachment_type = attachment_type_tag,
);
}

let mut attachments = self.limiter.enforce_quotas(attachments, ctx).await?;
process::scrub(&mut attachments, ctx)?;

Ok(Output::just(AttachmentsOutput(attachments)))
}
}

/// Serialized attachments extracted from an envelope.
#[derive(Debug)]
pub struct SerializedAttachments {
/// Original envelope headers.
headers: EnvelopeHeaders,
/// A list of attachments.
attachments: Items,
}

impl Counted for SerializedAttachments {
fn quantities(&self) -> Quantities {
self.attachments.quantities()
}
}

impl CountRateLimited for Managed<SerializedAttachments> {
type Error = Error;
}

/// Output produced by the [`AttachmentProcessor`].
#[derive(Debug)]
pub struct AttachmentsOutput(Managed<SerializedAttachments>);
18 changes: 18 additions & 0 deletions relay-server/src/processing/attachments/process.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use crate::managed::{Managed, Rejected};
use crate::processing::attachments::{Error, SerializedAttachments};
use crate::processing::{self, utils};

/// Runs PiiProcessors on the attachments.
pub fn scrub(
attachments: &mut Managed<SerializedAttachments>,
ctx: processing::Context<'_>,
) -> Result<(), Rejected<Error>> {
attachments.try_modify(|attachments, records| {
utils::attachments::scrub(
attachments.attachments.iter_mut(),
ctx.project_info,
Some(records),
);
Ok::<_, Error>(())
})
}
2 changes: 2 additions & 0 deletions relay-server/src/processing/common.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::Envelope;
use crate::managed::{Managed, Rejected};
use crate::processing::ForwardContext;
use crate::processing::attachments::AttachmentProcessor;
use crate::processing::check_ins::CheckInsProcessor;
use crate::processing::errors::ErrorsProcessor;
use crate::processing::logs::LogsProcessor;
Expand Down Expand Up @@ -68,4 +69,5 @@ outputs!(
TraceAttachments => TraceAttachmentsProcessor,
TraceMetrics => TraceMetricsProcessor,
Replays => ReplaysProcessor,
Attachments => AttachmentProcessor,
);
1 change: 1 addition & 0 deletions relay-server/src/processing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub use self::common::*;
pub use self::forward::*;
pub use self::limits::*;

pub mod attachments;
pub mod check_ins;
pub mod client_reports;
pub mod errors;
Expand Down
6 changes: 3 additions & 3 deletions relay-server/src/processing/trace_attachments/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ use crate::processing::utils::store::{
AttributeMeta, extract_client_sample_rate, extract_meta_attributes, proto_timestamp,
quantities_to_trace_item_outcomes, uuid_to_item_id,
};
use crate::services::objectstore::StoreAttachment;
use crate::services::objectstore::StoreTraceAttachment;
use crate::services::outcome::{DiscardReason, Outcome};

/// Converts an expanded attachment to a storable unit.
pub fn convert(
attachment: Managed<ExpandedAttachment>,
retention: Retention,
server_sample_rate: Option<f64>,
) -> Result<Managed<StoreAttachment>, Rejected<()>> {
) -> Result<Managed<StoreTraceAttachment>, Rejected<()>> {
let scoping = attachment.scoping();
let received_at = attachment.received_at();
attachment.try_map(|attachment, _record_keeper| {
Expand All @@ -42,7 +42,7 @@ pub fn convert(
let trace_item = attachment_to_trace_item(meta, quantities, ctx)
.ok_or(Outcome::Invalid(DiscardReason::InvalidTraceAttachment))?;

Ok::<_, Outcome>(StoreAttachment { trace_item, body })
Ok::<_, Outcome>(StoreTraceAttachment { trace_item, body })
})
}

Expand Down
Loading
Loading