ref(outcomes): add accepted outcomes consumer#7781
Conversation
90b1ba5 to
3152758
Compare
cff0d30 to
2a3e34f
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
Autofix Details
Bugbot Autofix prepared fixes for both issues found in the latest run.
- ✅ Fixed: Batch loses
bucket_intervalafter first flush- I replaced
mem::takewithmem::replace(..., AggregatedOutcomesBatch::new(self.bucket_interval))so flushed batches preserve the configured interval for subsequent messages.
- I replaced
- ✅ Fixed:
TrackOutcomemissing requiredoutcomefield for billing- I added
outcome: u8toTrackOutcomeand set it to0when producing accepted outcomes so serialized billing messages include the required outcome type.
- I added
Or push these changes by commenting:
@cursor push 14d9fa48b0
Preview (14d9fa48b0)
diff --git a/rust_snuba/src/strategies/accepted_outcomes/aggregator.rs b/rust_snuba/src/strategies/accepted_outcomes/aggregator.rs
--- a/rust_snuba/src/strategies/accepted_outcomes/aggregator.rs
+++ b/rust_snuba/src/strategies/accepted_outcomes/aggregator.rs
@@ -75,7 +75,10 @@
where
TNext: ProcessingStrategy<AggregatedOutcomesBatch>,
{
- let batch = std::mem::take(&mut self.batch);
+ let batch = std::mem::replace(
+ &mut self.batch,
+ AggregatedOutcomesBatch::new(self.bucket_interval),
+ );
let latest_offsets = std::mem::take(&mut self.latest_offsets);
// Committable offset is latest_offset + 1 (next offset to consume) per partition.
@@ -402,4 +405,28 @@
aggregator.poll().unwrap();
assert_eq!(aggregator.batch.num_buckets(), 0);
}
+
+ #[test]
+ fn flush_keeps_bucket_interval_for_next_batch() {
+ let mut aggregator = OutcomesAggregator::new(
+ Noop { last_message: None },
+ 500,
+ Duration::from_millis(30_000),
+ 60,
+ );
+
+ let partition = Partition::new(Topic::new("accepted-outcomes"), 0);
+ aggregator
+ .submit(Message::new_broker_message(
+ make_payload(6_000, 1, 2, 3, &[(4, 7)]),
+ partition,
+ 0,
+ Utc::now(),
+ ))
+ .unwrap();
+
+ aggregator.flush().unwrap();
+
+ assert_eq!(aggregator.batch.bucket_interval, 60);
+ }
}
diff --git a/rust_snuba/src/strategies/accepted_outcomes/produce_outcome.rs b/rust_snuba/src/strategies/accepted_outcomes/produce_outcome.rs
--- a/rust_snuba/src/strategies/accepted_outcomes/produce_outcome.rs
+++ b/rust_snuba/src/strategies/accepted_outcomes/produce_outcome.rs
@@ -58,6 +58,7 @@
org_id: key.org_id,
project_id: key.project_id,
key_id: key.key_id,
+ outcome: 0,
category: key.category,
quantity: stats.quantity,
};
@@ -219,6 +220,14 @@
let produced = produced_payloads.lock().unwrap();
assert_eq!(produced.len(), 2);
+ for payload in produced.iter() {
+ let payload = payload.payload().unwrap();
+ let body: serde_json::Value = serde_json::from_slice(payload).unwrap();
+ assert_eq!(
+ body.get("outcome").and_then(|value| value.as_u64()),
+ Some(0)
+ );
+ }
}
#[test]
diff --git a/rust_snuba/src/types.rs b/rust_snuba/src/types.rs
--- a/rust_snuba/src/types.rs
+++ b/rust_snuba/src/types.rs
@@ -544,6 +544,8 @@
pub org_id: u64,
pub project_id: u64,
pub key_id: u64,
+ /// Outcome enum value (0 = accepted)
+ pub outcome: u8,
/// DataCategory uint32 value as defined in Relay
pub category: u32,
pub quantity: u64,| self.batch = batch; | ||
| self.latest_offsets = latest_offsets; | ||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
Flush on MessageRejected silently swallows, doesn't retry
Medium Severity
When flush() encounters SubmitError::MessageRejected, it restores the batch and offsets but returns Ok(()). This resets last_flush is NOT updated, but the poll() method's time/size check will immediately trigger another flush() on the next poll. However, the restored batch doesn't actually get retried effectively because last_flush is only set on success — this is actually fine for the retry loop. But the warn log may be misleading in high-frequency scenarios. More importantly: if poll() is what triggers flush(), and flush() silently succeeds with a swallowed reject, the caller's poll() still proceeds to call self.next_step.poll(), which could advance internal state in the next step without the batch having been forwarded.
| .and_modify(|o| *o = (*o).max(offset)) | ||
| .or_insert(offset); | ||
| } | ||
| self.next_step.submit(message) |
There was a problem hiding this comment.
CommitOutcomes stores offsets before downstream step accepts message
Medium Severity
CommitOutcomes::submit() unconditionally stores offsets in commit_positions before calling self.next_step.submit(message). If the next step (ProduceAcceptedOutcome) returns MessageRejected, the offsets are already stored. When OutcomesAggregator catches MessageRejected in flush(), it restores the batch for retry and returns Ok(()). The subsequent self.next_step.poll() call in OutcomesAggregator::poll() invokes CommitOutcomes::poll(), which drains those prematurely stored offsets as a CommitRequest. This commits offsets for outcomes that were never produced — a crash at that point causes permanent outcome loss.
Additional Locations (1)
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
| Err(StrategyError::InvalidMessage(e)) | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Flush submits empty batches unnecessarily
Low Severity
The flush method doesn't check whether the batch is empty before creating and submitting a message. join always calls flush unconditionally, and poll triggers flush whenever max_batch_time_ms elapses even if no data has been accumulated. This results in empty AggregatedOutcomesBatch messages being submitted through the entire pipeline (commit + produce), doing unnecessary cloning and processing.
Additional Locations (1)
|
I wonder if the nomenclature (for the CLI) is a little confusing here. It sounds like this consumer consumes from snuba-items but produces to snuba-outcomes. In general a snuba consumer of Would it be clearer as Don't want to block merging on this question, but just popped out to me |
That's fair, |
onewland
left a comment
There was a problem hiding this comment.
This generally looks good to me, but maybe we could add a comment in a reasonable place in the code explaining the choices you've made for the questions I've asked, so people can find that context more easily later
| pub project_id: u64, | ||
| pub key_id: u64, | ||
| /// DataCategory uint32 value as defined in Relay | ||
| pub category: u32, |
There was a problem hiding this comment.
does category capture the item_type? that seems like important context for usage/billing data
There was a problem hiding this comment.
yes and item type can have multiple categories, e.g. LOG_ITEM and LOG_BYTE. if someone adds a new item type that should be paid for (aka not an internal item type) then there should be a relay data category added
|
|
||
| use crate::types::AggregatedOutcomesBatch; | ||
|
|
||
| pub struct CommitOutcomes<TNext> { |
There was a problem hiding this comment.
Is CommitOutcomes just what we're using to advance the consumer group on the AcceptedOutcomesConsumer on the snuba-items topic? Or something more advanced than that? Can we add a comment explaining why it needs to exist?
There was a problem hiding this comment.
I would have used the arroyo CommitOffsets step except that it doesn't have a next step because we always assume its at the end, but yeah it's just used to commit. I'll have to add handling MessageRejected's from the producer step too
| &self.concurrency, | ||
| self.skip_produce, | ||
| ); | ||
| let commit = CommitOutcomes::new(produce); |
There was a problem hiding this comment.
are we commit-ing on snuba-items before we produce to outcomes? if so, was that an explicit decision and can we explain why somewhere in code?
There was a problem hiding this comment.
yeah that was an explicit decision to do so. under producing outcomes is the better option for us than over producing in worst case scenarios. I can add comments for that
Maybe |



Context
Project outline and history is located at https://www.notion.so/sentry/Outcomes-for-EAP-2f48b10e4b5d80b0acf6f33a11f53889
Cli command
Args:
storage- used to get the topic we read from (snuba-items) this should always beeap_itemsconsumer-group- self explanitorymax-batch-time-ms- max time for aggregating outcomesmax-batch-size- max amount of buckets we will create when aggregating outcomes, we should almost always be maxing out on the time, not sizebucket-interval- what is the time granularity we are rolling up in, in seconds. 60 seconds means we round the timestamp to the minute for each bucket keyaccepted-outcomes-topic- what topic we produce accepted outcomes to, should always beoutcomes-billing(at least for now but still made this configurable)In terms of the strategies:
OutcomesAggregator-batches up the outcomes from theTraceItemmessages and also keeps track of the latest offsets for each partitionTraceItemmessageAggregatedOutcomesBatchmessageCommitOutcomes- gets thecommitablefrom the message and commitsAggregatedOutcomesBatchmessageAggregatedOutcomesBatchmessageProduceAcceptedOutcome- for each of outcomes in theAggregatedOutcomesBatch, produce to theoutcomes-billingtopicAggregatedOutcomesBatchmessageRelated Ops PR: https://github.com/getsentry/ops/pull/19551