Skip to content

Commit 684ade3

Browse files
authored
Implement terminating jobs pipeline (#3643)
* Move terminating jobs logic from services to background task * Simplify _process_terminating_job() code * Implement JobTerminatingPipeline scaffolding * Implement JobTerminatingWorker * Minor refactoring * Rename pipeline_tasks terminating_jobs.py to jobs_terminating.py * Wire pipeline * Use SELECT FOR UPDATE OF * Add contributing/PIPELINES.md * Set job-specific lock_owner * Decrease min_processing_interval * Rebase migration * Add ix_jobs_pipeline_fetch_q index * Add deprecated note * Expand PIPELINES.md
1 parent a357347 commit 684ade3

File tree

15 files changed

+2319
-395
lines changed

15 files changed

+2319
-395
lines changed

contributing/LOCKING.md

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
# Locking
22

3-
The `dstack` server supports SQLite and Postgres databases
4-
with two implementations of resource locking to handle concurrent access:
3+
The `dstack` server supports SQLite and Postgres databases with two implementations of resource locking to handle concurrent access:
54

65
* In-memory locking for SQLite.
76
* DB-level locking for Postgres.
@@ -34,11 +33,11 @@ There are few places that rely on advisory locks as when generating unique resou
3433

3534
## Working with locks
3635

37-
Concurrency is hard. Below you'll find common patterns and gotchas when working with locks to make it a bit more manageable.
36+
Concurrency is hard. Concurrency with locking is especially hard. Below you'll find common patterns and gotchas when working with locks to make it a bit more manageable.
3837

3938
**A task should acquire locks on resources it modifies**
4039

41-
This is a common sense approach. An alternative could be the inverse: job processing cannot run in parallel with run processing, so job processing takes run lock. This indirection complicates things and is discouraged. In this example, run processing should take job lock instead.
40+
This is common sense. An alternative could be the inverse: job processing cannot run in parallel with run processing, so job processing takes run lock. This indirection complicates things and is discouraged. In this example, run processing should take job lock instead.
4241

4342
**Start new transaction after acquiring a lock to see other transactions changes in SQLite.**
4443

@@ -75,15 +74,19 @@ unlock resources
7574

7675
If a transaction releases a lock before committing changes, the changes may not be visible to another transaction that acquired the lock and relies upon seeing all committed changes.
7776

78-
**Don't use `joinedload` when selecting `.with_for_update()`**
77+
**Using `joinedload` when selecting `.with_for_update()`**
7978

80-
In fact, using `joinedload` and `.with_for_update()` will trigger an error because `joinedload` produces OUTER LEFT JOIN that cannot be used with SELECT FOR UPDATE. A regular `.join()` can be used to lock related resources but it may lead to no rows if there is no row to join. Usually, you'd select with `selectinload` or first select with `.with_for_update()` without loading related attributes and then re-selecting with `joinedload` without `.with_for_update()`.
79+
Using `joinedload` and `.with_for_update()` triggers an error in case of no related rows because `joinedload` produces OUTER LEFT JOIN and SELECT FOR UPDATE cannot be applied to the nullable side of an OUTER JOIN. Here's the options:
80+
81+
* Use `.with_for_update(of=MainModel)`.
82+
* Select with `selectinload`
83+
* First select with `.with_for_update()` without loading related attributes and then re-select with `joinedload` without `.with_for_update()`.
84+
* Use regular `.join()` to lock related resources, but you may get 0 rows if there is no related row to join.
8185

8286
**Always use `.with_for_update(key_share=True)` unless you plan to delete rows or update a primary key column**
8387

8488
If you `SELECT FOR UPDATE` from a table that is referenced in a child table via a foreign key, it can lead to deadlocks if the child table is updated because Postgres will issue a `FOR KEY SHARE` lock on the parent table rows to ensure valid foreign keys. For this reason, you should always do `SELECT FOR NO KEY UPDATE` (.`with_for_update(key_share=True)`) if primary key columns are not modified. `SELECT FOR NO KEY UPDATE` is not blocked by a `FOR KEY SHARE` lock, so no deadlock.
8589

86-
8790
**Lock unique names**
8891

8992
The following pattern can be used to lock a unique name of some resource type:

contributing/PIPELINES.md

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
# Pipelines
2+
3+
This document describes how the `dstack` server implements background processing via so-called "pipelines".
4+
5+
*Historical context: `dstack` used to do all background processing via scheduled tasks. A scheduled task would process a specific resource type like volumes or runs by keeping DB transaction open for the entire processing duration and keeping the resource lock with SELECT FOR UPDATE (or in-memory lock on SQLite). This approach didn't scale well because the number of DB connections was a huge bottleneck. Pipelines replaced scheduled tasks: the do all the heavy processing outside of DB transactions and write locks to DB columns.*
6+
7+
## Overview
8+
9+
* Resources are continuously processed in the background by pipelines. A pipeline consists of a fetcher, workers, and a heartbeater.
10+
* A fetcher selects rows to be processed from the DB, marks them as locked in the DB, and puts them into an in-memory queue.
11+
* Workers consume rows from the in-memory queue, process the rows, and unlock them.
12+
* The locking (unlocking) is done by setting (unsetting) `lock_expires_at`, `lock_token`, `lock_owner`.
13+
* If the replica/pipeline dies, the rows stay locked in the db. Another replica picks up the rows after `lock_expires_at`.
14+
* `lock_token` prevents stale replica/pipeline to update the rows already picked up by the new replica.
15+
* `lock_owner` stores the pipeline that's locked the row so that only that pipeline can recover if it's stale.
16+
* A heartbeater tracks all rows in the pipeline (in the queue or in processing), and updates the lock expiration. This allows setting small `lock_expires_at` and picking up stale rows quickly
17+
* A fetcher performs the fetch when the queue size goes under a configured lower limit. It has exponential retry delays between empty fetches, thus reducing load on the DB.
18+
* There is a fetch hint mechanism that services can use to notify the pipelines within the replica – in that case the fetcher stops sleeping and fetches immediately.
19+
* Each pipeline locks one main resource but may lock related resources as well. It's not necessary to heartbeat related resources if the pipeline ensures no one else can re-lock them. This is typically done via setting and respecting `lock_owner`.
20+
21+
Related notes:
22+
23+
* All write APIs must respect DB-level locks. The endpoints can either try to acquire the lock with a timeout and error or provide an async API by storing the request in the DB.
24+
25+
## Implementation checklist
26+
27+
Brief checklist for implementing a new pipeline:
28+
29+
1. Fetcher locks only rows that are ready for processing:
30+
`status/time` filters, `lock_expires_at` is empty or expired, and `lock_owner` is empty or equal to the pipeline name. Keep the fetch order stable with `last_processed_at`.
31+
2. Fetcher takes row locks with `skip_locked` and updates `lock_expires_at`, `lock_token`, `lock_owner` before enqueueing items.
32+
3. Worker keeps heavy work outside DB sessions. DB sessions should be short and used only for refetch/locking and final apply.
33+
4. Apply stage updates rows using update maps/update rows, not by relying on mutating detached ORM models.
34+
5. Main apply update is guarded by `id + lock_token`. If the update affects `0` rows, the item is stale and processing results must not be applied.
35+
6. Successful apply updates `last_processed_at` and unlocks resources that were locked by this item.
36+
7. If related lock is unavailable, reset main lock for retry: keep `lock_owner`, clear `lock_token` and `lock_expires_at`, and set `last_processed_at` to now.
37+
8. Register the pipeline in `PipelineManager` and hint fetch from services after commit via `pipeline_hinter.hint_fetch(Model.__name__)`.
38+
9. Add minimum tests: fetch eligibility/order, successful unlock path, stale lock token path, and related lock contention retry path.
39+
40+
## Implementation patterns
41+
42+
**Guarded apply by lock token**
43+
44+
When writing processing results, update the main row with a filter by both `id` and `lock_token`. This guarantees that only the worker that still owns the lock can apply its results. If the update affects no rows, treat the item as stale and skip applying other changes (status changes, related updates, events). A stale item means another worker or replica already continued processing.
45+
46+
**Locking many related resources**
47+
48+
A pipeline may need to lock a potentially big set of related resource, e.g. fleet pipeline locking all fleet's instances. For this, do one SELECT FOR UPDATE of non-locked instances and one SELECT to see how many instances there are, and check if you managed to lock all of them. If fail to lock, release the main lock and try processing on another fetch iteration. You may keep `lock_owner` on the main resource or set `lock_owner` on locked related resource and make other pipelines respect that to guarantee the eventual locking of all related resources and avoid lock starvation.
49+
50+
**Locking a shared related resource**
51+
52+
Multiple main resources may need to lock the same related resource, e.g. multiple jobs may need to change the shared instance. In this case it's not sufficient to set `lock_owner` on the related resource to the pipeline name because workers processing different main resources can still race with each other. To avoid heartbeating the related resource, you may include main resource id in `lock_owner`, e.g. set `lock_owner = f"{Pipeline.__name__}:{item.id}"`.
53+
54+
**Reset-and-retry when related lock is unavailable**
55+
56+
If a worker cannot lock a required related resource, it should release only the main lock state needed for fast retry: unset `lock_token` and `lock_expires_at`, keep `lock_owner`, and set `last_processed_at` to now. This avoids long waiting and lets the same pipeline retry quickly on the next fetch iteration while other pipelines can still respect ownership intent.
57+
58+
**Dealing with side effects**
59+
60+
If processing has side effects and the apply phase fails due to a lock mismatch, there are several options: a) revert side effects b) make processing idempotent, i.e. next processing iteration detects side effects does not perform duplicating actions c) log side effects as errors and warn user about possible issues such as orphaned instances – as a temporary solution.
61+
62+
**Bulk apply with one consistent current time**
63+
64+
When apply needs to update multiple rows (main + related resources), build update maps/update rows first and resolve current-time placeholders once in the apply transaction using `NOW_PLACEHOLDER` + `resolve_now_placeholders()`. This keeps timestamps consistent across all rows and avoids subtle ordering bugs when the same processing pass writes several `*_at` fields.
65+
66+
## Performance analysis
67+
68+
* Pipeline throughput = workers_num / worker_processing_time. So quick tasks easily give high-throughput pipelines, e.g. 1s task with 20 workers is 1200 tasks/min.
69+
A slow 30s task gives only 40 tasks/min with the same number of workers. We can increase the number of workers but the peak memory usage will grow proportionally.
70+
In general, workers should be optimized to be as quick as possible to improve throughput.
71+
* Processing latency (wait) is close to 0 due to fetch hints if the pipeline is not saturated. In general, latency = queue_size / throughput.
72+
* In-memory queue maxsize provides a cap on memory usage and recovery time after crashes (number of locked items to retry).
73+
* Fetcher's DB load is proportional to the number of pipelines and is expected to be negligible. Workers can put a considerable read/write DB load as it's proportional to the number of workers. This can be optimized by batching workers' writes. Workers do processing outside of transactions so DB connections won't be a bottleneck.
74+
* There is a risk of lock starvation if a worker needs to lock all related resources. This is to be mitigated by 1) related pipelines checking `lock_owner` and skip locking to let the parent pipeline acquire all the locks eventually and 2) do the related resource locking only on paths that require it.

src/dstack/_internal/server/background/pipeline_tasks/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
from dstack._internal.server.background.pipeline_tasks.fleets import FleetPipeline
66
from dstack._internal.server.background.pipeline_tasks.gateways import GatewayPipeline
77
from dstack._internal.server.background.pipeline_tasks.instances import InstancePipeline
8+
from dstack._internal.server.background.pipeline_tasks.jobs_terminating import (
9+
JobTerminatingPipeline,
10+
)
811
from dstack._internal.server.background.pipeline_tasks.placement_groups import (
912
PlacementGroupPipeline,
1013
)
@@ -20,6 +23,7 @@ def __init__(self) -> None:
2023
ComputeGroupPipeline(),
2124
FleetPipeline(),
2225
GatewayPipeline(),
26+
JobTerminatingPipeline(),
2327
InstancePipeline(),
2428
PlacementGroupPipeline(),
2529
VolumePipeline(),

src/dstack/_internal/server/background/pipeline_tasks/fleets.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ async def fetch(self, limit: int) -> list[PipelineItem]:
152152
)
153153
.order_by(FleetModel.last_processed_at.asc())
154154
.limit(limit)
155-
.with_for_update(skip_locked=True, key_share=True)
155+
.with_for_update(skip_locked=True, key_share=True, of=FleetModel)
156156
.options(
157157
load_only(
158158
FleetModel.id,
@@ -352,7 +352,7 @@ async def _lock_fleet_instances_for_consolidation(
352352
InstanceModel.lock_owner == FleetPipeline.__name__,
353353
),
354354
)
355-
.with_for_update(skip_locked=True, key_share=True)
355+
.with_for_update(skip_locked=True, key_share=True, of=InstanceModel)
356356
)
357357
locked_instance_models = list(res.scalars().all())
358358
locked_instance_ids = {instance_model.id for instance_model in locked_instance_models}
@@ -369,7 +369,7 @@ async def _lock_fleet_instances_for_consolidation(
369369
"Failed to lock fleet %s instances. The fleet will be processed later.",
370370
item.id,
371371
)
372-
# Keep `lock_owner` so that `InstancePipeline` sees that the fleet is being locked
372+
# Keep `lock_owner` so that `InstancePipeline` can check that the fleet is being locked
373373
# but unset `lock_expires_at` to process the item again ASAP (after `min_processing_interval`).
374374
# Unset `lock_token` so that heartbeater can no longer update the item.
375375
res = await session.execute(

src/dstack/_internal/server/background/pipeline_tasks/gateways.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ async def fetch(self, limit: int) -> list[GatewayPipelineItem]:
152152
)
153153
.order_by(GatewayModel.last_processed_at.asc())
154154
.limit(limit)
155-
.with_for_update(skip_locked=True, key_share=True)
155+
.with_for_update(skip_locked=True, key_share=True, of=GatewayModel)
156156
.options(
157157
load_only(
158158
GatewayModel.id,

0 commit comments

Comments
 (0)