From 6a978d7539c9e03c843f9371f56a369c5a79a3f8 Mon Sep 17 00:00:00 2001 From: Evan Hicks Date: Fri, 6 Mar 2026 16:29:50 -0500 Subject: [PATCH] feat: make postgres table name configurable via pg_table_name config Allow different deployments to use different PostgreSQL table names for inflight task activations. Migrations use a {TABLE_NAME} placeholder with runtime substitution and checksum recomputation via sha2. All queries use format! with the configured table name. Defaults to inflight_taskactivations for backwards compatibility. Co-Authored-By: Claude Sonnet 4.6 --- .../0001_create_inflight_activations.sql | 2 +- src/config.rs | 4 + src/store/postgres_activation_store.rs | 211 ++++++++++++------ 3 files changed, 147 insertions(+), 70 deletions(-) diff --git a/pg_migrations/0001_create_inflight_activations.sql b/pg_migrations/0001_create_inflight_activations.sql index ee8b26a4..7d18b18a 100644 --- a/pg_migrations/0001_create_inflight_activations.sql +++ b/pg_migrations/0001_create_inflight_activations.sql @@ -1,5 +1,5 @@ -- PostgreSQL equivalent of the inflight_taskactivations table -CREATE TABLE IF NOT EXISTS inflight_taskactivations ( +CREATE TABLE IF NOT EXISTS {TABLE_NAME} ( id TEXT NOT NULL PRIMARY KEY, activation BYTEA NOT NULL, partition INTEGER NOT NULL, diff --git a/src/config.rs b/src/config.rs index 67b571d2..97ab23cf 100644 --- a/src/config.rs +++ b/src/config.rs @@ -146,6 +146,9 @@ pub struct Config { /// The name of the postgres database to use for the inflight activation store. pub pg_database_name: String, + /// The PostgreSQL table name for inflight task activations. + pub pg_table_name: String, + /// The path to the sqlite database pub db_path: String, @@ -285,6 +288,7 @@ impl Default for Config { run_migrations: false, pg_url: "postgres://postgres:password@sentry-postgres-1:5432/".to_owned(), pg_database_name: "default".to_owned(), + pg_table_name: "inflight_taskactivations".to_owned(), db_write_failure_backoff_ms: 4000, db_insert_batch_max_len: 256, db_insert_batch_max_size: 16_000_000, diff --git a/src/store/postgres_activation_store.rs b/src/store/postgres_activation_store.rs index fdfc493b..9c3534c4 100644 --- a/src/store/postgres_activation_store.rs +++ b/src/store/postgres_activation_store.rs @@ -45,6 +45,7 @@ pub async fn create_default_postgres_pool(url: &str) -> Result, E pub struct PostgresActivationStoreConfig { pub pg_url: String, pub pg_database_name: String, + pub table_name: String, pub run_migrations: bool, pub max_processing_attempts: usize, pub processing_deadline_grace_sec: u64, @@ -57,6 +58,7 @@ impl PostgresActivationStoreConfig { Self { pg_url: config.pg_url.clone(), pg_database_name: config.pg_database_name.clone(), + table_name: config.pg_table_name.clone(), run_migrations: config.run_migrations, max_processing_attempts: config.max_processing_attempts, vacuum_page_count: config.vacuum_page_count, @@ -66,6 +68,31 @@ impl PostgresActivationStoreConfig { } } +fn build_migrator(table_name: &str) -> sqlx::migrate::Migrator { + use sha2::{Digest, Sha384}; + use sqlx::migrate::{Migration, Migrator}; + use std::borrow::Cow; + + let base = sqlx::migrate!("./pg_migrations"); + let migrations: Vec = base + .migrations + .iter() + .map(|m| { + let sql = m.sql.replace("{TABLE_NAME}", table_name); + let checksum = Sha384::digest(sql.as_bytes()).to_vec(); + Migration { + sql: Cow::Owned(sql), + checksum: Cow::Owned(checksum), + ..m.clone() + } + }) + .collect(); + Migrator { + migrations: Cow::Owned(migrations), + ..base + } +} + pub struct PostgresActivationStore { read_pool: PgPool, write_pool: PgPool, @@ -111,7 +138,7 @@ impl PostgresActivationStore { if config.run_migrations { println!("Running migrations on database"); - sqlx::migrate!("./pg_migrations").run(&write_pool).await?; + build_migrator(&config.table_name).run(&write_pool).await?; } Ok(Self { @@ -153,7 +180,7 @@ impl InflightActivationStore for PostgresActivationStore { /// Get an activation by id. Primarily used for testing async fn get_by_id(&self, id: &str) -> Result, Error> { - let row_result: Option = sqlx::query_as( + let row_result: Option = sqlx::query_as(&format!( " SELECT id, activation, @@ -172,10 +199,11 @@ impl InflightActivationStore for PostgresActivationStore { namespace, taskname, on_attempts_exceeded - FROM inflight_taskactivations + FROM {table_name} WHERE id = $1 ", - ) + table_name = self.config.table_name + )) .bind(id) .fetch_optional(&self.read_pool) .await?; @@ -192,9 +220,9 @@ impl InflightActivationStore for PostgresActivationStore { if batch.is_empty() { return Ok(QueryResult { rows_affected: 0 }); } - let mut query_builder = QueryBuilder::::new( + let mut query_builder = QueryBuilder::::new(format!( " - INSERT INTO inflight_taskactivations + INSERT INTO {table_name} ( id, activation, @@ -215,7 +243,8 @@ impl InflightActivationStore for PostgresActivationStore { on_attempts_exceeded ) ", - ); + table_name = self.config.table_name + )); let rows = batch .into_iter() .map(TableRow::try_from) @@ -264,12 +293,13 @@ impl InflightActivationStore for PostgresActivationStore { let now = Utc::now(); let grace_period = self.config.processing_deadline_grace_sec; - let mut query_builder = QueryBuilder::new( + let mut query_builder = QueryBuilder::new(format!( "WITH selected_activations AS ( SELECT id - FROM inflight_taskactivations + FROM {table_name} WHERE status = ", - ); + table_name = self.config.table_name + )); query_builder.push_bind(InflightActivationStatus::Pending.to_string()); query_builder.push(" AND (expires_at IS NULL OR expires_at > "); query_builder.push_bind(now); @@ -297,14 +327,18 @@ impl InflightActivationStore for PostgresActivationStore { } query_builder.push(" FOR UPDATE SKIP LOCKED)"); query_builder.push(format!( - "UPDATE inflight_taskactivations + "UPDATE {table_name} SET processing_deadline = now() + (processing_deadline_duration * interval '1 second') + (interval '{grace_period} seconds'), - status = " + status = ", + table_name = self.config.table_name )); query_builder.push_bind(InflightActivationStatus::Processing.to_string()); query_builder.push(" FROM selected_activations "); - query_builder.push(" WHERE inflight_taskactivations.id = selected_activations.id"); + query_builder.push(format!( + " WHERE {table_name}.id = selected_activations.id", + table_name = self.config.table_name + )); query_builder.push(" RETURNING *, kafka_offset AS offset"); let mut conn = self @@ -324,15 +358,16 @@ impl InflightActivationStore for PostgresActivationStore { /// Tasks with delay_until set, will have their age adjusted based on their /// delay time. No tasks = 0 lag async fn pending_activation_max_lag(&self, now: &DateTime) -> f64 { - let result = sqlx::query( + let result = sqlx::query(&format!( "SELECT received_at, delay_until - FROM inflight_taskactivations + FROM {table_name} WHERE status = $1 AND processing_attempts = 0 ORDER BY received_at ASC LIMIT 1 ", - ) + table_name = self.config.table_name + )) .bind(InflightActivationStatus::Pending.to_string()) .fetch_one(&self.read_pool) .await; @@ -354,18 +389,23 @@ impl InflightActivationStore for PostgresActivationStore { #[instrument(skip_all)] async fn count_by_status(&self, status: InflightActivationStatus) -> Result { - let result = - sqlx::query("SELECT COUNT(*) as count FROM inflight_taskactivations WHERE status = $1") - .bind(status.to_string()) - .fetch_one(&self.read_pool) - .await?; + let result = sqlx::query(&format!( + "SELECT COUNT(*) as count FROM {table_name} WHERE status = $1", + table_name = self.config.table_name + )) + .bind(status.to_string()) + .fetch_one(&self.read_pool) + .await?; Ok(result.get::("count") as usize) } async fn count(&self) -> Result { - let result = sqlx::query("SELECT COUNT(*) as count FROM inflight_taskactivations") - .fetch_one(&self.read_pool) - .await?; + let result = sqlx::query(&format!( + "SELECT COUNT(*) as count FROM {table_name}", + table_name = self.config.table_name + )) + .fetch_one(&self.read_pool) + .await?; Ok(result.get::("count") as usize) } @@ -377,9 +417,10 @@ impl InflightActivationStore for PostgresActivationStore { status: InflightActivationStatus, ) -> Result, Error> { let mut conn = self.acquire_write_conn_metric("set_status").await?; - let result: Option = sqlx::query_as( - "UPDATE inflight_taskactivations SET status = $1 WHERE id = $2 RETURNING *, kafka_offset AS offset", - ) + let result: Option = sqlx::query_as(&format!( + "UPDATE {table_name} SET status = $1 WHERE id = $2 RETURNING *, kafka_offset AS offset", + table_name = self.config.table_name + )) .bind(status.to_string()) .bind(id) .fetch_optional(&mut *conn) @@ -401,27 +442,33 @@ impl InflightActivationStore for PostgresActivationStore { let mut conn = self .acquire_write_conn_metric("set_processing_deadline") .await?; - sqlx::query("UPDATE inflight_taskactivations SET processing_deadline = $1 WHERE id = $2") - .bind(deadline.unwrap()) - .bind(id) - .execute(&mut *conn) - .await?; + sqlx::query(&format!( + "UPDATE {table_name} SET processing_deadline = $1 WHERE id = $2", + table_name = self.config.table_name + )) + .bind(deadline.unwrap()) + .bind(id) + .execute(&mut *conn) + .await?; Ok(()) } #[instrument(skip_all)] async fn delete_activation(&self, id: &str) -> Result<(), Error> { let mut conn = self.acquire_write_conn_metric("delete_activation").await?; - sqlx::query("DELETE FROM inflight_taskactivations WHERE id = $1") - .bind(id) - .execute(&mut *conn) - .await?; + sqlx::query(&format!( + "DELETE FROM {table_name} WHERE id = $1", + table_name = self.config.table_name + )) + .bind(id) + .execute(&mut *conn) + .await?; Ok(()) } #[instrument(skip_all)] async fn get_retry_activations(&self) -> Result, Error> { - Ok(sqlx::query_as( + Ok(sqlx::query_as(&format!( " SELECT id, activation, @@ -440,10 +487,11 @@ impl InflightActivationStore for PostgresActivationStore { namespace, taskname, on_attempts_exceeded - FROM inflight_taskactivations + FROM {table_name} WHERE status = $1 ", - ) + table_name = self.config.table_name + )) .bind(InflightActivationStatus::Retry.to_string()) .fetch_all(&self.read_pool) .await? @@ -455,9 +503,12 @@ impl InflightActivationStore for PostgresActivationStore { // Used in tests async fn clear(&self) -> Result<(), Error> { let mut conn = self.acquire_write_conn_metric("clear").await?; - sqlx::query("TRUNCATE TABLE inflight_taskactivations") - .execute(&mut *conn) - .await?; + sqlx::query(&format!( + "TRUNCATE TABLE {table_name}", + table_name = self.config.table_name + )) + .execute(&mut *conn) + .await?; Ok(()) } @@ -472,11 +523,12 @@ impl InflightActivationStore for PostgresActivationStore { // At-most-once tasks that fail their processing deadlines go directly to failure // there are no retries, as the worker will reject the task due to at_most_once keys. - let most_once_result = sqlx::query( - "UPDATE inflight_taskactivations + let most_once_result = sqlx::query(&format!( + "UPDATE {table_name} SET processing_deadline = null, status = $1 WHERE processing_deadline < $2 AND at_most_once = TRUE AND status = $3", - ) + table_name = self.config.table_name + )) .bind(InflightActivationStatus::Failure.to_string()) .bind(now) .bind(InflightActivationStatus::Processing.to_string()) @@ -491,9 +543,12 @@ impl InflightActivationStore for PostgresActivationStore { // Update regular tasks. // Increment processing_attempts by 1 and reset processing_deadline to null. let result = sqlx::query( - "UPDATE inflight_taskactivations + &format!( + "UPDATE {table_name} SET processing_deadline = null, status = $1, processing_attempts = processing_attempts + 1 WHERE processing_deadline < $2 AND status = $3", + table_name = self.config.table_name + ), ) .bind(InflightActivationStatus::Pending.to_string()) .bind(now) @@ -518,11 +573,12 @@ impl InflightActivationStore for PostgresActivationStore { let mut conn = self .acquire_write_conn_metric("handle_processing_attempts") .await?; - let processing_attempts_result = sqlx::query( - "UPDATE inflight_taskactivations + let processing_attempts_result = sqlx::query(&format!( + "UPDATE {table_name} SET status = $1 WHERE processing_attempts >= $2 AND status = $3", - ) + table_name = self.config.table_name + )) .bind(InflightActivationStatus::Failure.to_string()) .bind(self.config.max_processing_attempts as i32) .bind(InflightActivationStatus::Pending.to_string()) @@ -547,7 +603,10 @@ impl InflightActivationStore for PostgresActivationStore { let now = Utc::now(); let mut conn = self.acquire_write_conn_metric("handle_expires_at").await?; let query = sqlx::query( - "DELETE FROM inflight_taskactivations WHERE status = $1 AND expires_at IS NOT NULL AND expires_at < $2", + &format!( + "DELETE FROM {table_name} WHERE status = $1 AND expires_at IS NOT NULL AND expires_at < $2", + table_name = self.config.table_name + ), ) .bind(InflightActivationStatus::Pending.to_string()) .bind(now) @@ -567,12 +626,13 @@ impl InflightActivationStore for PostgresActivationStore { async fn handle_delay_until(&self) -> Result { let now = Utc::now(); let mut conn = self.acquire_write_conn_metric("handle_delay_until").await?; - let update_result = sqlx::query( - r#"UPDATE inflight_taskactivations + let update_result = sqlx::query(&format!( + "UPDATE {table_name} SET status = $1 WHERE delay_until IS NOT NULL AND delay_until < $2 AND status = $3 - "#, - ) + ", + table_name = self.config.table_name + )) .bind(InflightActivationStatus::Pending.to_string()) .bind(now) .bind(InflightActivationStatus::Delay.to_string()) @@ -592,13 +652,15 @@ impl InflightActivationStore for PostgresActivationStore { async fn handle_failed_tasks(&self) -> Result { let mut atomic = self.write_pool.begin().await?; - let failed_tasks: Vec = - sqlx::query("SELECT id, activation, on_attempts_exceeded FROM inflight_taskactivations WHERE status = $1") - .bind(InflightActivationStatus::Failure.to_string()) - .fetch_all(&mut *atomic) - .await? - .into_iter() - .collect(); + let failed_tasks: Vec = sqlx::query(&format!( + "SELECT id, activation, on_attempts_exceeded FROM {table_name} WHERE status = $1", + table_name = self.config.table_name + )) + .bind(InflightActivationStatus::Failure.to_string()) + .fetch_all(&mut *atomic) + .await? + .into_iter() + .collect(); let mut forwarder = FailedTasksForwarder { to_discard: vec![], @@ -623,7 +685,10 @@ impl InflightActivationStore for PostgresActivationStore { } if !forwarder.to_discard.is_empty() { - let mut query_builder = QueryBuilder::new("UPDATE inflight_taskactivations "); + let mut query_builder = QueryBuilder::new(format!( + "UPDATE {table_name} ", + table_name = self.config.table_name + )); query_builder .push("SET status = ") .push_bind(InflightActivationStatus::Complete.to_string()) @@ -646,7 +711,10 @@ impl InflightActivationStore for PostgresActivationStore { /// Mark a collection of tasks as complete by id #[instrument(skip_all)] async fn mark_completed(&self, ids: Vec) -> Result { - let mut query_builder = QueryBuilder::new("UPDATE inflight_taskactivations "); + let mut query_builder = QueryBuilder::new(format!( + "UPDATE {table_name} ", + table_name = self.config.table_name + )); query_builder .push("SET status = ") .push_bind(InflightActivationStatus::Complete.to_string()) @@ -668,10 +736,13 @@ impl InflightActivationStore for PostgresActivationStore { #[instrument(skip_all)] async fn remove_completed(&self) -> Result { let mut conn = self.acquire_write_conn_metric("remove_completed").await?; - let query = sqlx::query("DELETE FROM inflight_taskactivations WHERE status = $1") - .bind(InflightActivationStatus::Complete.to_string()) - .execute(&mut *conn) - .await?; + let query = sqlx::query(&format!( + "DELETE FROM {table_name} WHERE status = $1", + table_name = self.config.table_name + )) + .bind(InflightActivationStatus::Complete.to_string()) + .execute(&mut *conn) + .await?; Ok(query.rows_affected()) } @@ -679,8 +750,10 @@ impl InflightActivationStore for PostgresActivationStore { /// Remove killswitched tasks. #[instrument(skip_all)] async fn remove_killswitched(&self, killswitched_tasks: Vec) -> Result { - let mut query_builder = - QueryBuilder::new("DELETE FROM inflight_taskactivations WHERE taskname IN ("); + let mut query_builder = QueryBuilder::new(format!( + "DELETE FROM {table_name} WHERE taskname IN (", + table_name = self.config.table_name + )); let mut separated = query_builder.separated(", "); for taskname in killswitched_tasks.iter() { separated.push_bind(taskname);