diff --git a/src/store/inflight_activation.rs b/src/store/inflight_activation.rs index 9de6bf2b..1beb294f 100644 --- a/src/store/inflight_activation.rs +++ b/src/store/inflight_activation.rs @@ -760,12 +760,7 @@ impl InflightActivationStore for SqliteActivationStore { b.push_bind(row.expires_at.map(|t| Some(t.timestamp()))); b.push_bind(row.delay_until.map(|t| Some(t.timestamp()))); b.push_bind(row.processing_deadline_duration); - if let Some(deadline) = row.processing_deadline { - b.push_bind(deadline.timestamp()); - } else { - // Add a literal null - b.push("null"); - } + b.push_bind(row.processing_deadline.map(|t| t.timestamp())); // None turns into NULL b.push_bind(row.status); b.push_bind(row.at_most_once); b.push_bind(row.application); @@ -813,56 +808,64 @@ impl InflightActivationStore for SqliteActivationStore { let now = Utc::now(); let grace_period = self.config.processing_deadline_grace_sec; - let mut query_builder = QueryBuilder::new(format!( + + let application_filter = application + .map(|_| " AND application = ?") + .unwrap_or_default(); + + let namespace_filter = namespaces + .filter(|ns| !ns.is_empty()) + .map(|ns| ns.iter().map(|_| "?").collect::>().join(", ")) + .map(|placeholders| format!(" AND namespace IN ({placeholders})")) + .unwrap_or_default(); + + let limit_clause = limit.map(|_| " LIMIT ?").unwrap_or_default(); + + let sql = format!( "UPDATE inflight_taskactivations - SET - processing_deadline = unixepoch( - 'now', '+' || (processing_deadline_duration + {grace_period}) || ' seconds' - ), - status = " - )); - query_builder.push_bind(InflightActivationStatus::Processing); - query_builder.push( - " - WHERE id IN ( - SELECT id - FROM inflight_taskactivations - WHERE status = ", + SET + processing_deadline = unixepoch( + 'now', '+' || (processing_deadline_duration + {grace_period}) || ' seconds' + ), + status = ? + WHERE id IN ( + SELECT id + FROM inflight_taskactivations + WHERE status = ? + AND (expires_at IS NULL OR expires_at > ?) + {application_filter} + {namespace_filter} + ORDER BY added_at + {limit_clause} + ) + RETURNING *" ); - query_builder.push_bind(InflightActivationStatus::Pending); - query_builder.push(" AND (expires_at IS NULL OR expires_at > "); - query_builder.push_bind(now.timestamp()); - query_builder.push(")"); - // Handle application & namespace filtering + // Bind values in the same order they appear in the query + let mut query = sqlx::query_as::(&sql) + .bind(InflightActivationStatus::Processing) + .bind(InflightActivationStatus::Pending) + .bind(now.timestamp()); + if let Some(value) = application { - query_builder.push(" AND application ="); - query_builder.push_bind(value); + query = query.bind(value); } - if let Some(namespaces) = namespaces - && !namespaces.is_empty() - { - query_builder.push(" AND namespace IN ("); - let mut separated = query_builder.separated(", "); - for namespace in namespaces.iter() { - separated.push_bind(namespace); + + if let Some(values) = namespaces.filter(|n| !n.is_empty()) { + for namespace in values { + query = query.bind(namespace); } - query_builder.push(")"); } - query_builder.push(" ORDER BY added_at"); - if let Some(limit) = limit { - query_builder.push(" LIMIT "); - query_builder.push_bind(limit); + + if let Some(value) = limit { + query = query.bind(value); } - query_builder.push(") RETURNING *"); let mut conn = self .acquire_write_conn_metric("get_pending_activation") .await?; - let rows: Vec = query_builder - .build_query_as::() - .fetch_all(&mut *conn) - .await?; + + let rows: Vec = query.fetch_all(&mut *conn).await?; Ok(rows.into_iter().map(|row| row.into()).collect()) } @@ -1171,19 +1174,26 @@ impl InflightActivationStore for SqliteActivationStore { } if !forwarder.to_discard.is_empty() { - let mut query_builder = QueryBuilder::new("UPDATE inflight_taskactivations "); - query_builder - .push("SET status = ") - .push_bind(InflightActivationStatus::Complete) - .push(" WHERE id IN ("); - - let mut separated = query_builder.separated(", "); - for (id, _body) in forwarder.to_discard.iter() { - separated.push_bind(id); + let placeholders = forwarder + .to_discard + .iter() + .map(|_| "?") + .collect::>() + .join(", "); + + let sql = format!( + "UPDATE inflight_taskactivations + SET status = ? + WHERE id IN ({placeholders})" + ); + + let mut query = sqlx::query::(&sql).bind(InflightActivationStatus::Complete); + + for (id, _) in forwarder.to_discard.iter() { + query = query.bind(id); } - separated.push_unseparated(")"); - query_builder.build().execute(&mut *atomic).await?; + query.execute(&mut *atomic).await?; } atomic.commit().await?; @@ -1194,19 +1204,22 @@ impl InflightActivationStore for SqliteActivationStore { /// Mark a collection of tasks as complete by id #[instrument(skip_all)] async fn mark_completed(&self, ids: Vec) -> Result { - let mut query_builder = QueryBuilder::new("UPDATE inflight_taskactivations "); - query_builder - .push("SET status = ") - .push_bind(InflightActivationStatus::Complete) - .push(" WHERE id IN ("); - - let mut separated = query_builder.separated(", "); - for id in ids.iter() { - separated.push_bind(id); + let placeholders = ids.iter().map(|_| "?").collect::>().join(", "); + + let sql = format!( + "UPDATE inflight_taskactivations + SET status = ? + WHERE id IN ({placeholders})" + ); + + let mut query = sqlx::query::(&sql).bind(InflightActivationStatus::Complete); + + for id in ids { + query = query.bind(id); } - separated.push_unseparated(")"); + let mut conn = self.acquire_write_conn_metric("mark_completed").await?; - let result = query_builder.build().execute(&mut *conn).await?; + let result = query.execute(&mut *conn).await?; Ok(result.rows_affected()) } @@ -1227,18 +1240,29 @@ impl InflightActivationStore for SqliteActivationStore { /// Remove killswitched tasks. #[instrument(skip_all)] async fn remove_killswitched(&self, killswitched_tasks: Vec) -> Result { - let mut query_builder = - QueryBuilder::new("DELETE FROM inflight_taskactivations WHERE taskname IN ("); - let mut separated = query_builder.separated(", "); - for taskname in killswitched_tasks.iter() { - separated.push_bind(taskname); + let placeholders = killswitched_tasks + .iter() + .map(|_| "?") + .collect::>() + .join(", "); + + let sql = format!( + "DELETE FROM inflight_taskactivations + WHERE taskname IN ({placeholders})" + ); + + let mut query = sqlx::query::(&sql); + + for taskname in killswitched_tasks { + query = query.bind(taskname); } - separated.push_unseparated(")"); + let mut conn = self .acquire_write_conn_metric("remove_killswitched") .await?; - let query = query_builder.build().execute(&mut *conn).await?; - Ok(query.rows_affected()) + let result = query.execute(&mut *conn).await?; + + Ok(result.rows_affected()) } }