Skip to content

Commit 93bb51c

Browse files
authored
Add probe until_ready configuration option (#3530)
This option allows to stop executing the probe once it reaches the `ready_after` threshold of successful executions. This is useful for heavier probes that only need to run in the startup phase and not during regular replica operation.
1 parent 1b65d8f commit 93bb51c

6 files changed

Lines changed: 99 additions & 12 deletions

File tree

src/dstack/_internal/core/compatibility/runs.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
from typing import Optional
22

33
from dstack._internal.core.models.common import IncludeExcludeDictType, IncludeExcludeSetType
4+
from dstack._internal.core.models.configurations import ServiceConfiguration
45
from dstack._internal.core.models.runs import (
6+
DEFAULT_PROBE_UNTIL_READY,
57
DEFAULT_REPLICA_GROUP_NAME,
68
ApplyRunPlanInput,
79
JobSpec,
@@ -60,12 +62,12 @@ def get_run_spec_excludes(run_spec: RunSpec) -> IncludeExcludeDictType:
6062
configuration_excludes: IncludeExcludeDictType = {}
6163
profile_excludes: IncludeExcludeSetType = set()
6264

63-
# Add excludes like this:
64-
#
65-
# if run_spec.configuration.tags is None:
66-
# configuration_excludes["tags"] = True
67-
# if run_spec.profile is not None and run_spec.profile.tags is None:
68-
# profile_excludes.add("tags")
65+
if isinstance(run_spec.configuration, ServiceConfiguration):
66+
if run_spec.configuration.probes:
67+
probe_excludes: IncludeExcludeDictType = {}
68+
configuration_excludes["probes"] = {"__all__": probe_excludes}
69+
if all(p.until_ready is None for p in run_spec.configuration.probes):
70+
probe_excludes["until_ready"] = True
6971

7072
if configuration_excludes:
7173
spec_excludes["configuration"] = configuration_excludes
@@ -83,4 +85,10 @@ def get_job_spec_excludes(job_specs: list[JobSpec]) -> IncludeExcludeDictType:
8385
spec_excludes: IncludeExcludeDictType = {}
8486
if all(s.replica_group == DEFAULT_REPLICA_GROUP_NAME for s in job_specs):
8587
spec_excludes["replica_group"] = True
88+
89+
probe_excludes: IncludeExcludeDictType = {}
90+
spec_excludes["probes"] = {"__all__": probe_excludes}
91+
if all(all(p.until_ready == DEFAULT_PROBE_UNTIL_READY for p in s.probes) for s in job_specs):
92+
probe_excludes["until_ready"] = True
93+
8694
return spec_excludes

src/dstack/_internal/core/models/configurations.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
DEFAULT_PROBE_INTERVAL = 15
5555
DEFAULT_PROBE_READY_AFTER = 1
5656
DEFAULT_PROBE_METHOD = "get"
57+
DEFAULT_PROBE_UNTIL_READY = False
5758
MAX_PROBE_URL_LEN = 2048
5859
DEFAULT_REPLICA_GROUP_NAME = "0"
5960
DEFAULT_MODEL_PROBE_TIMEOUT = 30
@@ -374,6 +375,16 @@ class ProbeConfig(generate_dual_core_model(ProbeConfigConfig)):
374375
),
375376
),
376377
] = None
378+
until_ready: Annotated[
379+
Optional[bool],
380+
Field(
381+
description=(
382+
"If `true`, the probe will stop being executed as soon as it reaches the"
383+
" `ready_after` threshold of successful executions."
384+
f" Defaults to `{str(DEFAULT_PROBE_UNTIL_READY).lower()}`"
385+
),
386+
),
387+
] = None
377388

378389
@validator("timeout", pre=True)
379390
def parse_timeout(cls, v: Optional[Union[int, str]]) -> Optional[int]:

src/dstack/_internal/core/models/runs.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
)
1818
from dstack._internal.core.models.configurations import (
1919
DEFAULT_PROBE_METHOD,
20+
DEFAULT_PROBE_UNTIL_READY,
2021
DEFAULT_REPLICA_GROUP_NAME,
2122
LEGACY_REPO_DIR,
2223
AnyRunConfiguration,
@@ -247,6 +248,7 @@ class ProbeSpec(CoreModel):
247248
timeout: int
248249
interval: int
249250
ready_after: int
251+
until_ready: bool = DEFAULT_PROBE_UNTIL_READY
250252

251253

252254
class JobSpec(CoreModel):

src/dstack/_internal/server/background/tasks/process_probes.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,15 @@ async def process_probes():
7373
else:
7474
job_spec: JobSpec = JobSpec.__response__.parse_raw(probe.job.job_spec_data)
7575
probe_spec = job_spec.probes[probe.probe_num]
76-
# Schedule the next probe execution in case this execution is interrupted
77-
probe.due = get_current_datetime() + _get_probe_async_processing_timeout(
78-
probe_spec
79-
)
80-
# Execute the probe asynchronously outside of the DB session
81-
PROBES_SCHEDULER.add_job(partial(_process_probe_async, probe, probe_spec))
76+
if probe_spec.until_ready and probe.success_streak >= probe_spec.ready_after:
77+
probe.active = False
78+
else:
79+
# Schedule the next probe execution in case this execution is interrupted
80+
probe.due = get_current_datetime() + _get_probe_async_processing_timeout(
81+
probe_spec
82+
)
83+
# Execute the probe asynchronously outside of the DB session
84+
PROBES_SCHEDULER.add_job(partial(_process_probe_async, probe, probe_spec))
8285
await session.commit()
8386
finally:
8487
probe_lockset.difference_update(probe_ids)

src/dstack/_internal/server/services/jobs/configurators/base.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
DEFAULT_PROBE_METHOD,
1919
DEFAULT_PROBE_READY_AFTER,
2020
DEFAULT_PROBE_TIMEOUT,
21+
DEFAULT_PROBE_UNTIL_READY,
2122
DEFAULT_PROBE_URL,
2223
DEFAULT_REPLICA_GROUP_NAME,
2324
LEGACY_REPO_DIR,
@@ -455,6 +456,7 @@ def _probe_config_to_spec(c: ProbeConfig) -> ProbeSpec:
455456
method=c.method if c.method is not None else DEFAULT_PROBE_METHOD,
456457
headers=c.headers,
457458
body=c.body,
459+
until_ready=c.until_ready if c.until_ready is not None else DEFAULT_PROBE_UNTIL_READY,
458460
)
459461

460462

src/tests/_internal/server/background/tasks/test_process_probes.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,67 @@ async def test_schedules_probe_execution(self, test_db, session: AsyncSession) -
163163
+ PROCESSING_OVERHEAD_TIMEOUT
164164
)
165165

166+
async def test_deactivates_probe_when_until_ready_and_ready_after_reached(
167+
self, test_db, session: AsyncSession
168+
) -> None:
169+
project = await create_project(session=session)
170+
user = await create_user(session=session)
171+
repo = await create_repo(
172+
session=session,
173+
project_id=project.id,
174+
)
175+
run = await create_run(
176+
session=session,
177+
project=project,
178+
repo=repo,
179+
user=user,
180+
run_spec=get_run_spec(
181+
run_name="test",
182+
repo_id=repo.name,
183+
configuration=ServiceConfiguration(
184+
port=80,
185+
image="nginx",
186+
probes=[
187+
ProbeConfig(
188+
type="http", url="/until_ready", until_ready=True, ready_after=3
189+
),
190+
ProbeConfig(type="http", url="/regular", until_ready=False, ready_after=3),
191+
],
192+
),
193+
),
194+
)
195+
instance = await create_instance(
196+
session=session,
197+
project=project,
198+
status=InstanceStatus.BUSY,
199+
)
200+
job = await create_job(
201+
session=session,
202+
run=run,
203+
status=JobStatus.RUNNING,
204+
job_provisioning_data=get_job_provisioning_data(),
205+
instance=instance,
206+
instance_assigned=True,
207+
)
208+
209+
probe_until_ready = await create_probe(session, job, probe_num=0, success_streak=3)
210+
probe_regular = await create_probe(session, job, probe_num=1, success_streak=3)
211+
212+
with patch(
213+
"dstack._internal.server.background.tasks.process_probes.PROBES_SCHEDULER"
214+
) as scheduler_mock:
215+
await process_probes()
216+
217+
await session.refresh(probe_until_ready)
218+
await session.refresh(probe_regular)
219+
220+
assert not probe_until_ready.active
221+
assert probe_until_ready.success_streak == 3
222+
223+
assert probe_regular.active
224+
assert probe_regular.success_streak == 3
225+
assert scheduler_mock.add_job.call_count == 1 # only the regular probe was scheduled
226+
166227

167228
# TODO: test probe success and failure
168229
# (skipping for now - a bit difficult to test and most of the logic will be mocked)

0 commit comments

Comments
 (0)