From f710726ce7fc9c3b8d705ee30972833211b60e90 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Mon, 23 Feb 2026 14:57:52 -0800 Subject: [PATCH 1/7] Improve Readability of `get_pending_activations_from_namespaces` --- src/store/inflight_activation.rs | 86 ++++++++++++++++++-------------- 1 file changed, 48 insertions(+), 38 deletions(-) diff --git a/src/store/inflight_activation.rs b/src/store/inflight_activation.rs index 9de6bf2b..572ee033 100644 --- a/src/store/inflight_activation.rs +++ b/src/store/inflight_activation.rs @@ -813,56 +813,66 @@ impl InflightActivationStore for SqliteActivationStore { let now = Utc::now(); let grace_period = self.config.processing_deadline_grace_sec; - let mut query_builder = QueryBuilder::new(format!( + + let application_filter = application + .map(|_| " AND application = ?") + .unwrap_or_default(); + + let namespace_filter = namespaces + .filter(|n| !n.is_empty()) + .map(|n| { + let placeholders = n.iter().map(|_| "?").collect::>().join(", "); + format!(" AND namespace IN ({placeholders})") + }) + .unwrap_or_default(); + + let limit_clause = limit.map(|_| " LIMIT ?").unwrap_or_default(); + + let sql = format!( "UPDATE inflight_taskactivations - SET - processing_deadline = unixepoch( - 'now', '+' || (processing_deadline_duration + {grace_period}) || ' seconds' - ), - status = " - )); - query_builder.push_bind(InflightActivationStatus::Processing); - query_builder.push( - " - WHERE id IN ( - SELECT id - FROM inflight_taskactivations - WHERE status = ", + SET + processing_deadline = unixepoch( + 'now', '+' || (processing_deadline_duration + {grace_period}) || ' seconds' + ), + status = ? + WHERE id IN ( + SELECT id + FROM inflight_taskactivations + WHERE status = ? + AND (expires_at IS NULL OR expires_at > ?) + {application_filter} + {namespace_filter} + ORDER BY added_at + {limit_clause} + ) + RETURNING *" ); - query_builder.push_bind(InflightActivationStatus::Pending); - query_builder.push(" AND (expires_at IS NULL OR expires_at > "); - query_builder.push_bind(now.timestamp()); - query_builder.push(")"); - // Handle application & namespace filtering + // Bind values in the same order they appear in the query + let mut query = sqlx::query_as::<_, TableRow>(&sql) + .bind(InflightActivationStatus::Processing) + .bind(InflightActivationStatus::Pending) + .bind(now.timestamp()); + if let Some(value) = application { - query_builder.push(" AND application ="); - query_builder.push_bind(value); + query = query.bind(value); } - if let Some(namespaces) = namespaces - && !namespaces.is_empty() - { - query_builder.push(" AND namespace IN ("); - let mut separated = query_builder.separated(", "); - for namespace in namespaces.iter() { - separated.push_bind(namespace); + + if let Some(values) = namespaces.filter(|n| !n.is_empty()) { + for namespace in values { + query = query.bind(namespace); } - query_builder.push(")"); } - query_builder.push(" ORDER BY added_at"); - if let Some(limit) = limit { - query_builder.push(" LIMIT "); - query_builder.push_bind(limit); + + if let Some(value) = limit { + query = query.bind(value); } - query_builder.push(") RETURNING *"); let mut conn = self .acquire_write_conn_metric("get_pending_activation") .await?; - let rows: Vec = query_builder - .build_query_as::() - .fetch_all(&mut *conn) - .await?; + + let rows: Vec = query.fetch_all(&mut *conn).await?; Ok(rows.into_iter().map(|row| row.into()).collect()) } From 480df3ce938d2617adf1a308980eee86aa50aa41 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Mon, 23 Feb 2026 15:08:59 -0800 Subject: [PATCH 2/7] Use Maps Because Functional Programming is Cool --- src/store/inflight_activation.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/store/inflight_activation.rs b/src/store/inflight_activation.rs index 572ee033..12578daf 100644 --- a/src/store/inflight_activation.rs +++ b/src/store/inflight_activation.rs @@ -820,10 +820,8 @@ impl InflightActivationStore for SqliteActivationStore { let namespace_filter = namespaces .filter(|n| !n.is_empty()) - .map(|n| { - let placeholders = n.iter().map(|_| "?").collect::>().join(", "); - format!(" AND namespace IN ({placeholders})") - }) + .map(|n| n.iter().map(|_| "?").collect::>().join(", ")) + .map(|n| format!(" AND namespace IN ({n})")) .unwrap_or_default(); let limit_clause = limit.map(|_| " LIMIT ?").unwrap_or_default(); From ef7e3915e077060f15335e8c81dc9f78ab17cde0 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Mon, 23 Feb 2026 15:10:01 -0800 Subject: [PATCH 3/7] Improve Variable Names --- src/store/inflight_activation.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/store/inflight_activation.rs b/src/store/inflight_activation.rs index 12578daf..65ce06cc 100644 --- a/src/store/inflight_activation.rs +++ b/src/store/inflight_activation.rs @@ -819,9 +819,9 @@ impl InflightActivationStore for SqliteActivationStore { .unwrap_or_default(); let namespace_filter = namespaces - .filter(|n| !n.is_empty()) - .map(|n| n.iter().map(|_| "?").collect::>().join(", ")) - .map(|n| format!(" AND namespace IN ({n})")) + .filter(|ns| !ns.is_empty()) + .map(|ns| ns.iter().map(|_| "?").collect::>().join(", ")) + .map(|placeholders| format!(" AND namespace IN ({placeholders})")) .unwrap_or_default(); let limit_clause = limit.map(|_| " LIMIT ?").unwrap_or_default(); From 64287587dab2b612efc31629c1835df2e2f46500 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Mon, 23 Feb 2026 15:32:45 -0800 Subject: [PATCH 4/7] Improve `handle_failed_tasks` --- src/store/inflight_activation.rs | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/src/store/inflight_activation.rs b/src/store/inflight_activation.rs index 65ce06cc..1ad7f72f 100644 --- a/src/store/inflight_activation.rs +++ b/src/store/inflight_activation.rs @@ -1179,19 +1179,27 @@ impl InflightActivationStore for SqliteActivationStore { } if !forwarder.to_discard.is_empty() { - let mut query_builder = QueryBuilder::new("UPDATE inflight_taskactivations "); - query_builder - .push("SET status = ") - .push_bind(InflightActivationStatus::Complete) - .push(" WHERE id IN ("); - - let mut separated = query_builder.separated(", "); - for (id, _body) in forwarder.to_discard.iter() { - separated.push_bind(id); + let placeholders = forwarder + .to_discard + .iter() + .map(|_| "?") + .collect::>() + .join(", "); + + let sql = format!( + " + UPDATE inflight_taskactivations + SET status = ? + WHERE id in ({placeholders})" + ); + + let mut query = sqlx::query::(&sql).bind(InflightActivationStatus::Complete); + + for (id, _) in forwarder.to_discard.iter() { + query = query.bind(id); } - separated.push_unseparated(")"); - query_builder.build().execute(&mut *atomic).await?; + query.execute(&mut *atomic).await?; } atomic.commit().await?; From e424a9e18d883ffa853a0fb745df819367917c5a Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Mon, 23 Feb 2026 15:40:25 -0800 Subject: [PATCH 5/7] Fix Query String Alignment --- src/store/inflight_activation.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/store/inflight_activation.rs b/src/store/inflight_activation.rs index 1ad7f72f..86d4a3e1 100644 --- a/src/store/inflight_activation.rs +++ b/src/store/inflight_activation.rs @@ -1187,10 +1187,9 @@ impl InflightActivationStore for SqliteActivationStore { .join(", "); let sql = format!( - " - UPDATE inflight_taskactivations - SET status = ? - WHERE id in ({placeholders})" + "UPDATE inflight_taskactivations + SET status = ? + WHERE id in ({placeholders})" ); let mut query = sqlx::query::(&sql).bind(InflightActivationStatus::Complete); From 4d45c0776faeb4fe19bd8380ad72100bed20869e Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Mon, 23 Feb 2026 15:52:21 -0800 Subject: [PATCH 6/7] Improve `mark_completed` --- src/store/inflight_activation.rs | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/src/store/inflight_activation.rs b/src/store/inflight_activation.rs index 86d4a3e1..dd96e79b 100644 --- a/src/store/inflight_activation.rs +++ b/src/store/inflight_activation.rs @@ -1189,7 +1189,7 @@ impl InflightActivationStore for SqliteActivationStore { let sql = format!( "UPDATE inflight_taskactivations SET status = ? - WHERE id in ({placeholders})" + WHERE id IN ({placeholders})" ); let mut query = sqlx::query::(&sql).bind(InflightActivationStatus::Complete); @@ -1209,19 +1209,22 @@ impl InflightActivationStore for SqliteActivationStore { /// Mark a collection of tasks as complete by id #[instrument(skip_all)] async fn mark_completed(&self, ids: Vec) -> Result { - let mut query_builder = QueryBuilder::new("UPDATE inflight_taskactivations "); - query_builder - .push("SET status = ") - .push_bind(InflightActivationStatus::Complete) - .push(" WHERE id IN ("); + let placeholders = ids.iter().map(|_| "?").collect::>().join(", "); - let mut separated = query_builder.separated(", "); - for id in ids.iter() { - separated.push_bind(id); + let sql = format!( + "UPDATE inflight_taskactivations + SET status = ? + WHERE id IN ({placeholders})" + ); + + let mut query = sqlx::query::(&sql).bind(InflightActivationStatus::Complete); + + for id in ids { + query = query.bind(id); } - separated.push_unseparated(")"); + let mut conn = self.acquire_write_conn_metric("mark_completed").await?; - let result = query_builder.build().execute(&mut *conn).await?; + let result = query.execute(&mut *conn).await?; Ok(result.rows_affected()) } From 5ad222c2eb0c8e87abc95bc8bf27bd0a12d4bbc4 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Mon, 23 Feb 2026 16:00:53 -0800 Subject: [PATCH 7/7] Minor `store` Improvement --- src/store/inflight_activation.rs | 36 +++++++++++++++++++------------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/src/store/inflight_activation.rs b/src/store/inflight_activation.rs index dd96e79b..1beb294f 100644 --- a/src/store/inflight_activation.rs +++ b/src/store/inflight_activation.rs @@ -760,12 +760,7 @@ impl InflightActivationStore for SqliteActivationStore { b.push_bind(row.expires_at.map(|t| Some(t.timestamp()))); b.push_bind(row.delay_until.map(|t| Some(t.timestamp()))); b.push_bind(row.processing_deadline_duration); - if let Some(deadline) = row.processing_deadline { - b.push_bind(deadline.timestamp()); - } else { - // Add a literal null - b.push("null"); - } + b.push_bind(row.processing_deadline.map(|t| t.timestamp())); // None turns into NULL b.push_bind(row.status); b.push_bind(row.at_most_once); b.push_bind(row.application); @@ -847,7 +842,7 @@ impl InflightActivationStore for SqliteActivationStore { ); // Bind values in the same order they appear in the query - let mut query = sqlx::query_as::<_, TableRow>(&sql) + let mut query = sqlx::query_as::(&sql) .bind(InflightActivationStatus::Processing) .bind(InflightActivationStatus::Pending) .bind(now.timestamp()); @@ -1245,18 +1240,29 @@ impl InflightActivationStore for SqliteActivationStore { /// Remove killswitched tasks. #[instrument(skip_all)] async fn remove_killswitched(&self, killswitched_tasks: Vec) -> Result { - let mut query_builder = - QueryBuilder::new("DELETE FROM inflight_taskactivations WHERE taskname IN ("); - let mut separated = query_builder.separated(", "); - for taskname in killswitched_tasks.iter() { - separated.push_bind(taskname); + let placeholders = killswitched_tasks + .iter() + .map(|_| "?") + .collect::>() + .join(", "); + + let sql = format!( + "DELETE FROM inflight_taskactivations + WHERE taskname IN ({placeholders})" + ); + + let mut query = sqlx::query::(&sql); + + for taskname in killswitched_tasks { + query = query.bind(taskname); } - separated.push_unseparated(")"); + let mut conn = self .acquire_write_conn_metric("remove_killswitched") .await?; - let query = query_builder.build().execute(&mut *conn).await?; - Ok(query.rows_affected()) + let result = query.execute(&mut *conn).await?; + + Ok(result.rows_affected()) } }