Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions dwctl/src/db/handlers/credits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -750,8 +750,29 @@ impl<'c> Credits<'c> {
/// Perform lazy aggregation for a user's unaggregated batched transactions.
/// This aggregates new transactions into batch_aggregates and marks them as aggregated.
/// Uses a single atomic UPDATE + aggregate approach to handle concurrent reads safely.
///
/// Uses a per-user advisory lock to prevent deadlocks when multiple concurrent reads
/// trigger aggregation simultaneously. If another transaction is already aggregating
/// for this user, we skip — the read query still works with slightly stale groupings.
#[instrument(skip(self), fields(user_id = %abbrev_uuid(&user_id)), err)]
pub async fn aggregate_user_batches(&mut self, user_id: UserId) -> Result<()> {
// Try to acquire a per-user advisory lock (released automatically at transaction end).
// If another transaction is already aggregating for this user, skip to avoid deadlocks
// between concurrent UPDATE credits_transactions + INSERT batch_aggregates statements.
let user_id_str = user_id.to_string();
let locked: bool = sqlx::query_scalar!(
"SELECT pg_try_advisory_xact_lock(hashtext('agg_user_batches_' || $1))",
&user_id_str
)
.fetch_one(&mut *self.db)
.await?
.unwrap_or(false);

if !locked {
trace!("Skipping aggregation for user {}, another transaction holds the lock", user_id);
return Ok(());
}

// Atomically mark transactions as aggregated and aggregate them in one query
// This uses UPDATE ... RETURNING with aggregation via CTE to avoid race conditions
let result = sqlx::query!(
Expand Down