Skip to content

Commit ca2172c

Browse files
authored
Events: instance/job reachability and health (#3482)
Add new events: - Job became reachable/unreachable - Instance became reachable/unreachable - Instance health changed Additionally, do not set `unreachable=True` for instances in statuses other than `idle` and `busy` to avoid unnecessary and potentially misleading events during instance provisioning.
1 parent 883b455 commit ca2172c

File tree

4 files changed

+68
-6
lines changed

4 files changed

+68
-6
lines changed

src/dstack/_internal/server/background/tasks/process_instances.py

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
from dstack._internal.core.models.backends.base import BackendType
4444
from dstack._internal.core.models.fleets import InstanceGroupPlacement
4545
from dstack._internal.core.models.instances import (
46+
HealthStatus,
4647
InstanceAvailability,
4748
InstanceOfferWithAvailability,
4849
InstanceRuntime,
@@ -75,6 +76,7 @@
7576
InstanceHealthResponse,
7677
)
7778
from dstack._internal.server.services import backends as backends_services
79+
from dstack._internal.server.services import events
7880
from dstack._internal.server.services.fleets import (
7981
fleet_model_to_fleet,
8082
get_create_instance_offers,
@@ -759,8 +761,8 @@ async def _check_instance(session: AsyncSession, instance: InstanceModel) -> Non
759761
)
760762
session.add(health_check_model)
761763

762-
instance.health = health_status
763-
instance.unreachable = not instance_check.reachable
764+
_set_health(session, instance, health_status)
765+
_set_unreachable(session, instance, unreachable=not instance_check.reachable)
764766

765767
if instance_check.reachable:
766768
instance.termination_deadline = None
@@ -1093,6 +1095,31 @@ async def _terminate(session: AsyncSession, instance: InstanceModel) -> None:
10931095
switch_instance_status(session, instance, InstanceStatus.TERMINATED)
10941096

10951097

1098+
def _set_health(session: AsyncSession, instance: InstanceModel, health: HealthStatus) -> None:
1099+
if instance.health != health:
1100+
events.emit(
1101+
session,
1102+
f"Instance health changed {instance.health.upper()} -> {health.upper()}",
1103+
actor=events.SystemActor(),
1104+
targets=[events.Target.from_model(instance)],
1105+
)
1106+
instance.health = health
1107+
1108+
1109+
def _set_unreachable(session: AsyncSession, instance: InstanceModel, unreachable: bool) -> None:
1110+
if (
1111+
instance.status.is_available() # avoid misleading event during provisioning
1112+
and instance.unreachable != unreachable
1113+
):
1114+
events.emit(
1115+
session,
1116+
"Instance became unreachable" if unreachable else "Instance became reachable",
1117+
actor=events.SystemActor(),
1118+
targets=[events.Target.from_model(instance)],
1119+
)
1120+
instance.unreachable = unreachable
1121+
1122+
10961123
def _next_termination_retry_at(instance: InstanceModel) -> datetime.datetime:
10971124
assert instance.last_termination_retry_at is not None
10981125
return instance.last_termination_retry_at + TERMINATION_RETRY_TIMEOUT

src/dstack/_internal/server/background/tasks/process_running_jobs.py

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,9 @@
5151
UserModel,
5252
)
5353
from dstack._internal.server.schemas.runner import GPUDevice, TaskStatus
54+
from dstack._internal.server.services import events, services
5455
from dstack._internal.server.services import files as files_services
5556
from dstack._internal.server.services import logs as logs_services
56-
from dstack._internal.server.services import services
5757
from dstack._internal.server.services.instances import get_instance_ssh_private_keys
5858
from dstack._internal.server.services.jobs import (
5959
find_job,
@@ -355,7 +355,7 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel):
355355
)
356356

357357
if success:
358-
job_model.disconnected_at = None
358+
_reset_disconnected_at(session, job_model)
359359
else:
360360
if job_model.termination_reason:
361361
logger.warning(
@@ -368,8 +368,7 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel):
368368
# job will be terminated and instance will be emptied by process_terminating_jobs
369369
else:
370370
# No job_model.termination_reason set means ssh connection failed
371-
if job_model.disconnected_at is None:
372-
job_model.disconnected_at = common_utils.get_current_datetime()
371+
_set_disconnected_at_now(session, job_model)
373372
if _should_terminate_job_due_to_disconnect(job_model):
374373
# TODO: Replace with JobTerminationReason.INSTANCE_UNREACHABLE for on-demand.
375374
job_model.termination_reason = JobTerminationReason.INTERRUPTED_BY_NO_CAPACITY
@@ -933,6 +932,28 @@ def _should_terminate_due_to_low_gpu_util(min_util: int, gpus_util: Iterable[Ite
933932
return False
934933

935934

935+
def _set_disconnected_at_now(session: AsyncSession, job_model: JobModel) -> None:
936+
if job_model.disconnected_at is None:
937+
job_model.disconnected_at = common_utils.get_current_datetime()
938+
events.emit(
939+
session,
940+
"Job became unreachable",
941+
actor=events.SystemActor(),
942+
targets=[events.Target.from_model(job_model)],
943+
)
944+
945+
946+
def _reset_disconnected_at(session: AsyncSession, job_model: JobModel) -> None:
947+
if job_model.disconnected_at is not None:
948+
job_model.disconnected_at = None
949+
events.emit(
950+
session,
951+
"Job became reachable",
952+
actor=events.SystemActor(),
953+
targets=[events.Target.from_model(job_model)],
954+
)
955+
956+
936957
def _get_cluster_info(
937958
jobs: List[Job],
938959
replica_num: int,

src/tests/_internal/server/background/tasks/test_process_instances.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
get_job_provisioning_data,
7878
get_placement_group_provisioning_data,
7979
get_remote_connection_info,
80+
list_events,
8081
)
8182
from dstack._internal.utils.common import get_current_datetime
8283

@@ -324,10 +325,13 @@ async def test_check_shim_process_ureachable_state(
324325
healthcheck.assert_called()
325326

326327
await session.refresh(instance)
328+
events = await list_events(session)
327329

328330
assert instance is not None
329331
assert instance.status == InstanceStatus.IDLE
330332
assert not instance.unreachable
333+
assert len(events) == 1
334+
assert events[0].message == "Instance became reachable"
331335

332336
@pytest.mark.asyncio
333337
@pytest.mark.parametrize("health_status", [HealthStatus.HEALTHY, HealthStatus.FAILURE])
@@ -351,12 +355,15 @@ async def test_check_shim_switch_to_unreachable_state(
351355
await process_instances()
352356

353357
await session.refresh(instance)
358+
events = await list_events(session)
354359

355360
assert instance is not None
356361
assert instance.status == InstanceStatus.IDLE
357362
assert instance.unreachable
358363
# Should keep the previous status
359364
assert instance.health == health_status
365+
assert len(events) == 1
366+
assert events[0].message == "Instance became unreachable"
360367

361368
@pytest.mark.asyncio
362369
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
@@ -384,11 +391,14 @@ async def test_check_shim_check_instance_health(self, test_db, session: AsyncSes
384391
await process_instances()
385392

386393
await session.refresh(instance)
394+
events = await list_events(session)
387395

388396
assert instance is not None
389397
assert instance.status == InstanceStatus.IDLE
390398
assert not instance.unreachable
391399
assert instance.health == HealthStatus.WARNING
400+
assert len(events) == 1
401+
assert events[0].message == "Instance health changed HEALTHY -> WARNING"
392402

393403
res = await session.execute(select(InstanceHealthCheckModel))
394404
health_check = res.scalars().one()

src/tests/_internal/server/background/tasks/test_process_running_jobs.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
get_job_runtime_data,
6969
get_run_spec,
7070
get_volume_configuration,
71+
list_events,
7172
)
7273
from dstack._internal.utils.common import get_current_datetime
7374

@@ -515,9 +516,12 @@ async def test_pulling_shim_failed(self, test_db, session: AsyncSession):
515516
await process_running_jobs()
516517
assert SSHTunnelMock.call_count == 3
517518
await session.refresh(job)
519+
events = await list_events(session)
518520
assert job is not None
519521
assert job.disconnected_at is not None
520522
assert job.status == JobStatus.PULLING
523+
assert len(events) == 1
524+
assert events[0].message == "Job became unreachable"
521525
with (
522526
patch("dstack._internal.server.services.runner.ssh.SSHTunnel") as SSHTunnelMock,
523527
patch("dstack._internal.server.services.runner.ssh.time.sleep"),

0 commit comments

Comments
 (0)