Skip to content

Make Queries Easier to Read and Modify#562

Draft
james-mcnulty wants to merge 7 commits intomainfrom
george/better-queries
Draft

Make Queries Easier to Read and Modify#562
james-mcnulty wants to merge 7 commits intomainfrom
george/better-queries

Conversation

@james-mcnulty
Copy link
Member

@james-mcnulty james-mcnulty commented Feb 23, 2026

Description

In many places, queries are built manually by gluing together strings and values with a sequence of push and push_bind calls. This makes it difficult to understand what queries are actually built and executed, and makes it easy to make mistakes when writing queries.

Although most people will never touch the queries, and they will change rarely, I believe this is a positive and low effort improvement to the codebase. Although there is now slightly more code in some areas, it is easier to understand.

Examples

Getting Pending Tasks from Namespace

Before

let now = Utc::now();

let grace_period = self.config.processing_deadline_grace_sec;

let mut query_builder = QueryBuilder::new(format!(
    "UPDATE inflight_taskactivations
    SET
        processing_deadline = unixepoch(
            'now', '+' || (processing_deadline_duration + {grace_period}) || ' seconds'
        ),
        status = "
));

query_builder.push_bind(InflightActivationStatus::Processing);

query_builder.push(
    "
    WHERE id IN (
        SELECT id
        FROM inflight_taskactivations
        WHERE status = ",
);

query_builder.push_bind(InflightActivationStatus::Pending);
query_builder.push(" AND (expires_at IS NULL OR expires_at > ");
query_builder.push_bind(now.timestamp());
query_builder.push(")");

// Handle application & namespace filtering
if let Some(value) = application {
    query_builder.push(" AND application =");
    query_builder.push_bind(value);
}

if let Some(namespaces) = namespaces
    && !namespaces.is_empty()
{
    query_builder.push(" AND namespace IN (");
    let mut separated = query_builder.separated(", ");
    for namespace in namespaces.iter() {
        separated.push_bind(namespace);
    }
    query_builder.push(")");
}

query_builder.push(" ORDER BY added_at");

if let Some(limit) = limit {
    query_builder.push(" LIMIT ");
    query_builder.push_bind(limit);
}

query_builder.push(") RETURNING *");

let mut conn = self
    .acquire_write_conn_metric("get_pending_activation")
    .await?;

let rows: Vec<TableRow> = query_builder
    .build_query_as::<TableRow>()
    .fetch_all(&mut *conn)
    .await?;

Ok(rows.into_iter().map(|row| row.into()).collect())

After

let now = Utc::now();

let grace_period = self.config.processing_deadline_grace_sec;

let application_filter = application
    .map(|_| " AND application = ?")
    .unwrap_or_default();

let namespace_filter = namespaces
    .filter(|ns| !ns.is_empty())
    .map(|ns| ns.iter().map(|_| "?").collect::<Vec<_>>().join(", "))
    .map(|placeholders| format!(" AND namespace IN ({placeholders})"))
    .unwrap_or_default();

let limit_clause = limit.map(|_| " LIMIT ?").unwrap_or_default();

let sql = format!(
    "UPDATE inflight_taskactivations
     SET
         processing_deadline = unixepoch(
             'now', '+' || (processing_deadline_duration + {grace_period}) || ' seconds'
         ),
         status = ?
     WHERE id IN (
         SELECT id
         FROM inflight_taskactivations
         WHERE status = ?
           AND (expires_at IS NULL OR expires_at > ?)
           {application_filter}
           {namespace_filter}
         ORDER BY added_at
         {limit_clause}
     )
     RETURNING *"
);

// Bind values in the same order they appear in the query
let mut query = sqlx::query_as::<_, TableRow>(&sql)
    .bind(InflightActivationStatus::Processing)
    .bind(InflightActivationStatus::Pending)
    .bind(now.timestamp());

if let Some(value) = application {
    query = query.bind(value);
}

if let Some(values) = namespaces.filter(|n| !n.is_empty()) {
    for namespace in values {
        query = query.bind(namespace);
    }
}

if let Some(value) = limit {
    query = query.bind(value);
}

let mut conn = self
    .acquire_write_conn_metric("get_pending_activation")
    .await?;

let rows: Vec<TableRow> = query.fetch_all(&mut *conn).await?;

Ok(rows.into_iter().map(|row| row.into()).collect())

Marking Tasks to Discard as Completed

Before

let mut query_builder = QueryBuilder::new("UPDATE inflight_taskactivations ");

query_builder
    .push("SET status = ")
    .push_bind(InflightActivationStatus::Complete)
    .push(" WHERE id IN (");

let mut separated = query_builder.separated(", ");

for (id, _body) in forwarder.to_discard.iter() {
    separated.push_bind(id);
}

separated.push_unseparated(")");

query_builder.build().execute(&mut *atomic).await?;

After

let placeholders = forwarder
    .to_discard
    .iter()
    .map(|_| "?")
    .collect::<Vec<_>>()
    .join(", ");

let sql = format!(
    "UPDATE inflight_taskactivations
     SET status = ?
     WHERE id in ({placeholders})"
);

let mut query = sqlx::query::<Sqlite>(&sql).bind(InflightActivationStatus::Complete);

for (id, _) in forwarder.to_discard.iter() {
    query = query.bind(id);
}

query.execute(&mut *atomic).await?;

Copy link
Member

@evanh evanh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this style looks good to me!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants