Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions relay-server/src/processing/transactions/types/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ use crate::processing::spans::Indexed;
use crate::processing::transactions::types::{ExpandedTransaction, StandaloneProfile};
use crate::processing::{Forward, ForwardContext};
use crate::services::outcome::{DiscardReason, Outcome};
#[cfg(feature = "processing")]
use crate::services::store::StoreEnvelope;

/// Output of the transaction processor.
#[derive(Debug)]
Expand Down Expand Up @@ -74,11 +72,25 @@ impl Forward for TransactionOutput {
s: StoreHandle<'_>,
ctx: ForwardContext<'_>,
) -> Result<(), Rejected<()>> {
let envelope: ManagedEnvelope = self.serialize_envelope(ctx)?.into();
use crate::envelope::ItemType;

let envelope = self.serialize_envelope(ctx)?;
let envelope = ManagedEnvelope::from(envelope).into_processed();

s.send_to_store(StoreEnvelope {
envelope: envelope.into_processed(),
});
let has_attachments = envelope
.envelope()
.items()
.any(|item| item.ty() == &ItemType::Attachment);
let use_objectstore = || {
let options = &ctx.global_config.options;
crate::utils::sample(options.objectstore_attachments_sample_rate).is_keep()
};

if has_attachments && use_objectstore() {
s.send_to_objectstore(crate::services::store::StoreEnvelope { envelope });
} else {
s.send_to_store(crate::services::store::StoreEnvelope { envelope });
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicated objectstore routing logic across two processors

Low Severity

The entire forward_store body (serialize → check attachments → sample objectstore → route) is now duplicated verbatim between ErrorOutput and TransactionOutput. If the routing logic ever needs a fix or new condition, both copies need identical updates — a likely source of future divergence. This could be extracted into a shared helper (e.g., a free function or a default method on the Forward trait that delegates to serialize_envelope).

Fix in Cursor Fix in Web

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this sucks, we will address this separately


Ok(())
}
Expand Down
36 changes: 31 additions & 5 deletions tests/integration/test_attachments.py
Original file line number Diff line number Diff line change
Expand Up @@ -567,19 +567,31 @@ def test_view_hierarchy_processing(
outcomes_consumer.assert_empty()


@pytest.mark.parametrize("use_objectstore", [False, True])
def test_event_with_attachment(
mini_sentry,
relay_with_processing,
attachments_consumer,
transactions_consumer,
use_objectstore,
objectstore,
):
project_id = 42
event_id = "515539018c9b4260a6f999572f1661ee"

mini_sentry.add_full_project_config(project_id)
relay = relay_with_processing()

if use_objectstore:
mini_sentry.global_config["options"][
"relay.objectstore-attachments.sample-rate"
] = 1.0

relay = relay_with_processing(
{"processing": {"upload": {"objectstore_url": "http://127.0.0.1:8888/"}}}
)
attachments_consumer = attachments_consumer()
transactions_consumer = transactions_consumer()
objectstore = objectstore("attachments", project_id)

# event attachments are always sent as chunks, and added to events
envelope = Envelope(headers=[["event_id", event_id]])
Expand All @@ -593,8 +605,9 @@ def test_event_with_attachment(

relay.send_envelope(project_id, envelope)

chunk, _ = attachments_consumer.get_attachment_chunk()
assert chunk == b"event attachment"
if not use_objectstore:
chunk, _ = attachments_consumer.get_attachment_chunk()
assert chunk == b"event attachment"

_, event_message = attachments_consumer.get_event()

Expand All @@ -606,10 +619,14 @@ def test_event_with_attachment(
"content_type": "application/octet-stream",
"attachment_type": "event.attachment",
"size": len(b"event attachment"),
"chunks": 1,
**({"stored_id": mock.ANY} if use_objectstore else {"chunks": 1}),
}
]

if use_objectstore:
stored_id = event_message["attachments"][0]["stored_id"]
assert objectstore.get(stored_id).payload.read() == b"event attachment"

# transaction attachments are sent as individual attachments,
# either using chunks by default, or contents inlined
envelope = Envelope(headers=[["event_id", event_id]])
Expand All @@ -629,11 +646,20 @@ def test_event_with_attachment(
"content_type": "application/octet-stream",
"attachment_type": "event.attachment",
"size": len(b"transaction attachment"),
"data": b"transaction attachment",
**(
{"stored_id": mock.ANY}
if use_objectstore
else {"data": b"transaction attachment"}
),
}

attachment = attachments_consumer.get_individual_attachment()
assert attachment["attachment"].pop("id")

if use_objectstore:
stored_id = attachment["attachment"]["stored_id"]
assert objectstore.get(stored_id).payload.read() == b"transaction attachment"

assert attachment == {
"type": "attachment",
"attachment": expected_attachment,
Expand Down
Loading