Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 98 additions & 74 deletions src/store/inflight_activation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -760,12 +760,7 @@ impl InflightActivationStore for SqliteActivationStore {
b.push_bind(row.expires_at.map(|t| Some(t.timestamp())));
b.push_bind(row.delay_until.map(|t| Some(t.timestamp())));
b.push_bind(row.processing_deadline_duration);
if let Some(deadline) = row.processing_deadline {
b.push_bind(deadline.timestamp());
} else {
// Add a literal null
b.push("null");
}
b.push_bind(row.processing_deadline.map(|t| t.timestamp())); // None turns into NULL
b.push_bind(row.status);
b.push_bind(row.at_most_once);
b.push_bind(row.application);
Expand Down Expand Up @@ -813,56 +808,64 @@ impl InflightActivationStore for SqliteActivationStore {
let now = Utc::now();

let grace_period = self.config.processing_deadline_grace_sec;
let mut query_builder = QueryBuilder::new(format!(

let application_filter = application
.map(|_| " AND application = ?")
.unwrap_or_default();

let namespace_filter = namespaces
.filter(|ns| !ns.is_empty())
.map(|ns| ns.iter().map(|_| "?").collect::<Vec<_>>().join(", "))
.map(|placeholders| format!(" AND namespace IN ({placeholders})"))
.unwrap_or_default();

let limit_clause = limit.map(|_| " LIMIT ?").unwrap_or_default();

let sql = format!(
"UPDATE inflight_taskactivations
SET
processing_deadline = unixepoch(
'now', '+' || (processing_deadline_duration + {grace_period}) || ' seconds'
),
status = "
));
query_builder.push_bind(InflightActivationStatus::Processing);
query_builder.push(
"
WHERE id IN (
SELECT id
FROM inflight_taskactivations
WHERE status = ",
SET
processing_deadline = unixepoch(
'now', '+' || (processing_deadline_duration + {grace_period}) || ' seconds'
),
status = ?
WHERE id IN (
SELECT id
FROM inflight_taskactivations
WHERE status = ?
AND (expires_at IS NULL OR expires_at > ?)
{application_filter}
{namespace_filter}
ORDER BY added_at
{limit_clause}
)
RETURNING *"
);
query_builder.push_bind(InflightActivationStatus::Pending);
query_builder.push(" AND (expires_at IS NULL OR expires_at > ");
query_builder.push_bind(now.timestamp());
query_builder.push(")");

// Handle application & namespace filtering
// Bind values in the same order they appear in the query
let mut query = sqlx::query_as::<Sqlite, TableRow>(&sql)
.bind(InflightActivationStatus::Processing)
.bind(InflightActivationStatus::Pending)
.bind(now.timestamp());

if let Some(value) = application {
query_builder.push(" AND application =");
query_builder.push_bind(value);
query = query.bind(value);
}
if let Some(namespaces) = namespaces
&& !namespaces.is_empty()
{
query_builder.push(" AND namespace IN (");
let mut separated = query_builder.separated(", ");
for namespace in namespaces.iter() {
separated.push_bind(namespace);

if let Some(values) = namespaces.filter(|n| !n.is_empty()) {
for namespace in values {
query = query.bind(namespace);
}
query_builder.push(")");
}
query_builder.push(" ORDER BY added_at");
if let Some(limit) = limit {
query_builder.push(" LIMIT ");
query_builder.push_bind(limit);

if let Some(value) = limit {
query = query.bind(value);
}
query_builder.push(") RETURNING *");

let mut conn = self
.acquire_write_conn_metric("get_pending_activation")
.await?;
let rows: Vec<TableRow> = query_builder
.build_query_as::<TableRow>()
.fetch_all(&mut *conn)
.await?;

let rows: Vec<TableRow> = query.fetch_all(&mut *conn).await?;

Ok(rows.into_iter().map(|row| row.into()).collect())
}
Expand Down Expand Up @@ -1171,19 +1174,26 @@ impl InflightActivationStore for SqliteActivationStore {
}

if !forwarder.to_discard.is_empty() {
let mut query_builder = QueryBuilder::new("UPDATE inflight_taskactivations ");
query_builder
.push("SET status = ")
.push_bind(InflightActivationStatus::Complete)
.push(" WHERE id IN (");

let mut separated = query_builder.separated(", ");
for (id, _body) in forwarder.to_discard.iter() {
separated.push_bind(id);
let placeholders = forwarder
.to_discard
.iter()
.map(|_| "?")
.collect::<Vec<_>>()
.join(", ");

let sql = format!(
"UPDATE inflight_taskactivations
SET status = ?
WHERE id IN ({placeholders})"
);

let mut query = sqlx::query::<Sqlite>(&sql).bind(InflightActivationStatus::Complete);

for (id, _) in forwarder.to_discard.iter() {
query = query.bind(id);
}
separated.push_unseparated(")");

query_builder.build().execute(&mut *atomic).await?;
query.execute(&mut *atomic).await?;
}

atomic.commit().await?;
Expand All @@ -1194,19 +1204,22 @@ impl InflightActivationStore for SqliteActivationStore {
/// Mark a collection of tasks as complete by id
#[instrument(skip_all)]
async fn mark_completed(&self, ids: Vec<String>) -> Result<u64, Error> {
let mut query_builder = QueryBuilder::new("UPDATE inflight_taskactivations ");
query_builder
.push("SET status = ")
.push_bind(InflightActivationStatus::Complete)
.push(" WHERE id IN (");

let mut separated = query_builder.separated(", ");
for id in ids.iter() {
separated.push_bind(id);
let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(", ");

let sql = format!(
"UPDATE inflight_taskactivations
SET status = ?
WHERE id IN ({placeholders})"
);

let mut query = sqlx::query::<Sqlite>(&sql).bind(InflightActivationStatus::Complete);

for id in ids {
query = query.bind(id);
}
separated.push_unseparated(")");

let mut conn = self.acquire_write_conn_metric("mark_completed").await?;
let result = query_builder.build().execute(&mut *conn).await?;
let result = query.execute(&mut *conn).await?;

Ok(result.rows_affected())
}
Expand All @@ -1227,18 +1240,29 @@ impl InflightActivationStore for SqliteActivationStore {
/// Remove killswitched tasks.
#[instrument(skip_all)]
async fn remove_killswitched(&self, killswitched_tasks: Vec<String>) -> Result<u64, Error> {
let mut query_builder =
QueryBuilder::new("DELETE FROM inflight_taskactivations WHERE taskname IN (");
let mut separated = query_builder.separated(", ");
for taskname in killswitched_tasks.iter() {
separated.push_bind(taskname);
let placeholders = killswitched_tasks
.iter()
.map(|_| "?")
.collect::<Vec<_>>()
.join(", ");

let sql = format!(
"DELETE FROM inflight_taskactivations
WHERE taskname IN ({placeholders})"
);

let mut query = sqlx::query::<Sqlite>(&sql);

for taskname in killswitched_tasks {
query = query.bind(taskname);
}
separated.push_unseparated(")");

let mut conn = self
.acquire_write_conn_metric("remove_killswitched")
.await?;
let query = query_builder.build().execute(&mut *conn).await?;

Ok(query.rows_affected())
let result = query.execute(&mut *conn).await?;

Ok(result.rows_affected())
}
}
Loading