From 9ff348991100f358bb23aa797d7ee8531cbc75b3 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Fri, 13 Mar 2026 11:21:24 +0500 Subject: [PATCH 01/16] Extract submitted-jobs context --- .../scheduled_tasks/submitted_jobs.py | 117 ++++++++++++------ 1 file changed, 76 insertions(+), 41 deletions(-) diff --git a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py index 87b44b50a..d5e3769e1 100644 --- a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py +++ b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py @@ -2,6 +2,7 @@ import itertools import uuid from contextlib import AsyncExitStack +from dataclasses import dataclass from datetime import datetime, timedelta from typing import List, Optional, Union @@ -216,49 +217,33 @@ async def _process_next_submitted_job(): last_processed_at = common_utils.get_current_datetime() +@dataclass +class _SubmittedJobContext: + job_model: JobModel + run_model: RunModel + project: ProjectModel + run: Run + job: Job + replica_jobs: list[Job] + replica_job_models: list[JobModel] + fleet_model: Optional[FleetModel] + multinode: bool + + async def _process_submitted_job( exit_stack: AsyncExitStack, session: AsyncSession, job_model: JobModel ): - # Refetch to load related attributes. - res = await session.execute( - select(JobModel) - .where(JobModel.id == job_model.id) - .options(joinedload(JobModel.instance)) - .options( - joinedload(JobModel.fleet).selectinload( - FleetModel.instances.and_(InstanceModel.deleted == False) - ), - ) - ) - job_model = res.unique().scalar_one() - res = await session.execute( - select(RunModel) - .where(RunModel.id == job_model.run_id) - .options(joinedload(RunModel.project).joinedload(ProjectModel.backends)) - .options(joinedload(RunModel.user).load_only(UserModel.name)) - .options( - joinedload(RunModel.fleet).selectinload( - FleetModel.instances.and_(InstanceModel.deleted == False) - ), - ) - ) - run_model = res.unique().scalar_one() - logger.debug("%s: provisioning has started", fmt(job_model)) + context = await _load_submitted_job_context(session=session, job_model=job_model) + logger.debug("%s: provisioning has started", fmt(context.job_model)) - project = run_model.project - run = run_model_to_run(run_model) + job_model = context.job_model + run_model = context.run_model + project = context.project + run = context.run + job = context.job + fleet_model = context.fleet_model run_spec = run.run_spec run_profile = run_spec.merged_profile - job = find_job(run.jobs, job_model.replica_num, job_model.job_num) - replica_jobs = find_jobs(run.jobs, replica_num=job_model.replica_num) - replica_job_models = _get_job_models_for_jobs(run_model.jobs, replica_jobs) - multinode = job.job_spec.jobs_per_replica > 1 - - # Master job chooses fleet for the run. - # Due to two-step processing, it's saved to job_model.fleet. - # Other jobs just inherit fleet from run_model.fleet. - # If master job chooses no fleet, the new fleet will be created. - fleet_model = run_model.fleet or job_model.fleet master_job = find_job(run.jobs, job_model.replica_num, 0) master_job_provisioning_data = None @@ -382,13 +367,13 @@ async def _process_submitted_job( fleet_model=fleet_model, instances_with_offers=fleet_instances_with_offers, job_model=job_model, - multinode=multinode, + multinode=context.multinode, ) job_model.last_processed_at = common_utils.get_current_datetime() await session.commit() return - jobs_to_provision = _get_jobs_to_provision(job, replica_jobs, job_model) + jobs_to_provision = _get_jobs_to_provision(job, context.replica_jobs, job_model) # TODO: Volume attachment for compute groups is not yet supported since # currently supported compute groups (e.g. Runpod) don't need explicit volume attachment. need_volume_attachment = True @@ -504,7 +489,7 @@ async def _process_submitted_job( ) taken_instance_nums.add(instance_num) provisioned_job_model.job_runtime_data = _prepare_job_runtime_data( - offer, multinode + offer, context.multinode ).json() session.add(instance) events.emit( @@ -519,7 +504,9 @@ async def _process_submitted_job( provisioned_job_model.used_instance_id = instance.id provisioned_job_model.last_processed_at = common_utils.get_current_datetime() - _allow_other_replica_jobs_to_provision(job_model, replica_job_models, jobs_to_provision) + _allow_other_replica_jobs_to_provision( + job_model, context.replica_job_models, jobs_to_provision + ) volumes_ids = sorted([v.id for vs in volume_models for v in vs]) if need_volume_attachment: @@ -548,6 +535,54 @@ async def _process_submitted_job( await session.commit() +async def _load_submitted_job_context( + session: AsyncSession, job_model: JobModel +) -> _SubmittedJobContext: + # Refetch to load related attributes. + res = await session.execute( + select(JobModel) + .where(JobModel.id == job_model.id) + .options(joinedload(JobModel.instance)) + .options( + joinedload(JobModel.fleet).selectinload( + FleetModel.instances.and_(InstanceModel.deleted == False) + ), + ) + ) + job_model = res.unique().scalar_one() + res = await session.execute( + select(RunModel) + .where(RunModel.id == job_model.run_id) + .options(joinedload(RunModel.project).joinedload(ProjectModel.backends)) + .options(joinedload(RunModel.user).load_only(UserModel.name)) + .options( + joinedload(RunModel.fleet).selectinload( + FleetModel.instances.and_(InstanceModel.deleted == False) + ), + ) + ) + run_model = res.unique().scalar_one() + + run = run_model_to_run(run_model) + job = find_job(run.jobs, job_model.replica_num, job_model.job_num) + replica_jobs = find_jobs(run.jobs, replica_num=job_model.replica_num) + return _SubmittedJobContext( + job_model=job_model, + run_model=run_model, + project=run_model.project, + run=run, + job=job, + replica_jobs=replica_jobs, + replica_job_models=_get_job_models_for_jobs(run_model.jobs, replica_jobs), + # Master job chooses fleet for the run. + # Due to two-step processing, it's saved to job_model.fleet. + # Other jobs just inherit fleet from run_model.fleet. + # If master job chooses no fleet, the new fleet will be created. + fleet_model=run_model.fleet or job_model.fleet, + multinode=job.job_spec.jobs_per_replica > 1, + ) + + async def _refetch_fleet_models_with_instances( session: AsyncSession, fleets_ids: list[uuid.UUID], From 19931b5712a6697ecc0559da507045b65663dc3a Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Fri, 13 Mar 2026 11:35:27 +0500 Subject: [PATCH 02/16] Extract submitted-jobs exit helpers --- .../scheduled_tasks/submitted_jobs.py | 107 +++++++++++------- 1 file changed, 69 insertions(+), 38 deletions(-) diff --git a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py index d5e3769e1..44042e523 100644 --- a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py +++ b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py @@ -4,7 +4,7 @@ from contextlib import AsyncExitStack from dataclasses import dataclass from datetime import datetime, timedelta -from typing import List, Optional, Union +from typing import List, Optional, Union, cast from sqlalchemy import exists, func, select from sqlalchemy.ext.asyncio import AsyncSession @@ -249,18 +249,22 @@ async def _process_submitted_job( master_job_provisioning_data = None if job.job_spec.job_num != 0: if master_job.job_submissions[-1].job_provisioning_data is None: - logger.debug("%s: waiting for master job to be provisioned", fmt(job_model)) - job_model.last_processed_at = common_utils.get_current_datetime() - await session.commit() + await _defer_submitted_job( + session=session, + job_model=job_model, + log_message="waiting for master job to be provisioned", + ) return master_job_provisioning_data = JobProvisioningData.__response__.parse_obj( master_job.job_submissions[-1].job_provisioning_data ) if job.job_spec.job_num != 0 or job.job_spec.replica_num != 0: if run_model.fleet_id is None: - logger.debug("%s: waiting for the run to be assigned to the fleet", fmt(job_model)) - job_model.last_processed_at = common_utils.get_current_datetime() - await session.commit() + await _defer_submitted_job( + session=session, + job_model=job_model, + log_message="waiting for the run to be assigned to the fleet", + ) return try: volume_models = await get_job_configured_volume_models( @@ -280,11 +284,12 @@ async def _process_submitted_job( check_can_attach_job_volumes(volumes) except ServerClientError as e: logger.warning("%s: failed to prepare run volumes: %s", fmt(job_model), repr(e)) - job_model.termination_reason = JobTerminationReason.VOLUME_ERROR - job_model.termination_reason_message = e.msg - switch_job_status(session, job_model, JobStatus.TERMINATING) - job_model.last_processed_at = common_utils.get_current_datetime() - await session.commit() + await _terminate_submitted_job( + session=session, + job_model=job_model, + reason=JobTerminationReason.VOLUME_ERROR, + message=e.msg, + ) return # Submitted jobs processing happens in two steps (transactions). @@ -340,27 +345,25 @@ async def _process_submitted_job( if run_spec.merged_profile.fleets is not None: # Run cannot create new fleets when fleets are specified logger.debug("%s: failed to use specified fleets", fmt(job_model)) - job_model.termination_reason = ( - JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY + await _terminate_submitted_job( + session=session, + job_model=job_model, + reason=JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY, + message="Failed to use specified fleets", ) - job_model.termination_reason_message = "Failed to use specified fleets" - switch_job_status(session, job_model, JobStatus.TERMINATING) - job_model.last_processed_at = common_utils.get_current_datetime() - await session.commit() return if not FeatureFlags.AUTOCREATED_FLEETS_ENABLED: logger.debug("%s: no fleet found", fmt(job_model)) - job_model.termination_reason = ( - JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY - ) # Note: `_get_job_status_message` relies on the "No fleet found" substring to return "no fleets" - job_model.termination_reason_message = ( - "No matching fleet found. Possible reasons: " - "https://dstack.ai/docs/guides/troubleshooting/#no-fleets" + await _terminate_submitted_job( + session=session, + job_model=job_model, + reason=JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY, + message=( + "No matching fleet found. Possible reasons: " + "https://dstack.ai/docs/guides/troubleshooting/#no-fleets" + ), ) - switch_job_status(session, job_model, JobStatus.TERMINATING) - job_model.last_processed_at = common_utils.get_current_datetime() - await session.commit() return instance = await _assign_job_to_fleet_instance( session=session, @@ -369,8 +372,7 @@ async def _process_submitted_job( job_model=job_model, multinode=context.multinode, ) - job_model.last_processed_at = common_utils.get_current_datetime() - await session.commit() + await _mark_job_processed(session=session, job_model=job_model) return jobs_to_provision = _get_jobs_to_provision(job, context.replica_jobs, job_model) @@ -390,11 +392,12 @@ async def _process_submitted_job( else: if run_profile.creation_policy == CreationPolicy.REUSE: logger.debug("%s: reuse instance failed", fmt(job_model)) - job_model.termination_reason = JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY - job_model.termination_reason_message = "Could not reuse any instances for this job" - switch_job_status(session, job_model, JobStatus.TERMINATING) - job_model.last_processed_at = common_utils.get_current_datetime() - await session.commit() + await _terminate_submitted_job( + session=session, + job_model=job_model, + reason=JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY, + message="Could not reuse any instances for this job", + ) return ( @@ -423,10 +426,11 @@ async def _process_submitted_job( ) if run_job_result is None: logger.debug("%s: provisioning failed", fmt(job_model)) - job_model.termination_reason = JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY - switch_job_status(session, job_model, JobStatus.TERMINATING) - job_model.last_processed_at = common_utils.get_current_datetime() - await session.commit() + await _terminate_submitted_job( + session=session, + job_model=job_model, + reason=JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY, + ) return if fleet_model is None: @@ -583,6 +587,33 @@ async def _load_submitted_job_context( ) +async def _defer_submitted_job( + session: AsyncSession, + job_model: JobModel, + log_message: str, +): + logger.debug("%s: %s", fmt(job_model), log_message) + await _mark_job_processed(session=session, job_model=job_model) + + +async def _terminate_submitted_job( + session: AsyncSession, + job_model: JobModel, + reason: JobTerminationReason, + message: object = common_utils.UNSET, +): + job_model.termination_reason = reason + if message is not common_utils.UNSET: + job_model.termination_reason_message = cast(Optional[str], message) + switch_job_status(session, job_model, JobStatus.TERMINATING) + await _mark_job_processed(session=session, job_model=job_model) + + +async def _mark_job_processed(session: AsyncSession, job_model: JobModel): + job_model.last_processed_at = common_utils.get_current_datetime() + await session.commit() + + async def _refetch_fleet_models_with_instances( session: AsyncSession, fleets_ids: list[uuid.UUID], From 5f92b86949b42df19d87b20fc02171601f5d2e0e Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Fri, 13 Mar 2026 11:58:02 +0500 Subject: [PATCH 03/16] Extract submitted-jobs preconditions --- .../scheduled_tasks/submitted_jobs.py | 163 +++++++++++++----- 1 file changed, 118 insertions(+), 45 deletions(-) diff --git a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py index 44042e523..11aa30c90 100644 --- a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py +++ b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py @@ -230,6 +230,17 @@ class _SubmittedJobContext: multinode: bool +@dataclass +class _MasterJobDependency: + provisioning_data: Optional[JobProvisioningData] + + +@dataclass +class _PreparedJobVolumes: + volume_models: list[list[VolumeModel]] + volumes: list[list[Volume]] + + async def _process_submitted_job( exit_stack: AsyncExitStack, session: AsyncSession, job_model: JobModel ): @@ -245,53 +256,36 @@ async def _process_submitted_job( run_spec = run.run_spec run_profile = run_spec.merged_profile - master_job = find_job(run.jobs, job_model.replica_num, 0) - master_job_provisioning_data = None - if job.job_spec.job_num != 0: - if master_job.job_submissions[-1].job_provisioning_data is None: - await _defer_submitted_job( - session=session, - job_model=job_model, - log_message="waiting for master job to be provisioned", - ) - return - master_job_provisioning_data = JobProvisioningData.__response__.parse_obj( - master_job.job_submissions[-1].job_provisioning_data - ) - if job.job_spec.job_num != 0 or job.job_spec.replica_num != 0: - if run_model.fleet_id is None: - await _defer_submitted_job( - session=session, - job_model=job_model, - log_message="waiting for the run to be assigned to the fleet", - ) - return - try: - volume_models = await get_job_configured_volume_models( - session=session, - project=project, - run_spec=run_spec, - job_num=job.job_spec.job_num, - job_spec=job.job_spec, - ) - volumes = await get_job_configured_volumes( - session=session, - project=project, - run_spec=run_spec, - job_num=job.job_spec.job_num, - job_spec=job.job_spec, - ) - check_can_attach_job_volumes(volumes) - except ServerClientError as e: - logger.warning("%s: failed to prepare run volumes: %s", fmt(job_model), repr(e)) - await _terminate_submitted_job( - session=session, - job_model=job_model, - reason=JobTerminationReason.VOLUME_ERROR, - message=e.msg, - ) + master_job_dependency = await _resolve_master_job_dependency( + session=session, + job_model=job_model, + run=run, + job=job, + ) + if master_job_dependency is None: + return + master_job_provisioning_data = master_job_dependency.provisioning_data + + if not await _resolve_fleet_dependency( + session=session, + job_model=job_model, + run_model=run_model, + job=job, + ): return + prepared_job_volumes = await _prepare_job_volumes( + session=session, + job_model=job_model, + project=project, + run_spec=run_spec, + job=job, + ) + if prepared_job_volumes is None: + return + volume_models = prepared_job_volumes.volume_models + volumes = prepared_job_volumes.volumes + # Submitted jobs processing happens in two steps (transactions). # First, the jobs gets an instance assigned (or no instance). # Then, the job runs on the assigned instance or a new instance is provisioned. @@ -587,6 +581,85 @@ async def _load_submitted_job_context( ) +async def _resolve_master_job_dependency( + session: AsyncSession, + job_model: JobModel, + run: Run, + job: Job, +) -> Optional[_MasterJobDependency]: + if job.job_spec.job_num == 0: + return _MasterJobDependency(provisioning_data=None) + master_job = find_job(run.jobs, job_model.replica_num, 0) + if master_job.job_submissions[-1].job_provisioning_data is None: + await _defer_submitted_job( + session=session, + job_model=job_model, + log_message="waiting for master job to be provisioned", + ) + return None + return _MasterJobDependency( + provisioning_data=JobProvisioningData.__response__.parse_obj( + master_job.job_submissions[-1].job_provisioning_data + ) + ) + + +async def _resolve_fleet_dependency( + session: AsyncSession, + job_model: JobModel, + run_model: RunModel, + job: Job, +) -> bool: + if job.job_spec.job_num == 0 and job.job_spec.replica_num == 0: + return True + if run_model.fleet_id is not None: + return True + await _defer_submitted_job( + session=session, + job_model=job_model, + log_message="waiting for the run to be assigned to the fleet", + ) + return False + + +async def _prepare_job_volumes( + session: AsyncSession, + job_model: JobModel, + project: ProjectModel, + run_spec, + job: Job, +) -> Optional[_PreparedJobVolumes]: + try: + volume_models = await get_job_configured_volume_models( + session=session, + project=project, + run_spec=run_spec, + job_num=job.job_spec.job_num, + job_spec=job.job_spec, + ) + volumes = await get_job_configured_volumes( + session=session, + project=project, + run_spec=run_spec, + job_num=job.job_spec.job_num, + job_spec=job.job_spec, + ) + check_can_attach_job_volumes(volumes) + except ServerClientError as e: + logger.warning("%s: failed to prepare run volumes: %s", fmt(job_model), repr(e)) + await _terminate_submitted_job( + session=session, + job_model=job_model, + reason=JobTerminationReason.VOLUME_ERROR, + message=e.msg, + ) + return None + return _PreparedJobVolumes( + volume_models=volume_models, + volumes=volumes, + ) + + async def _defer_submitted_job( session: AsyncSession, job_model: JobModel, From 10f469478a4726b654df4f864c6cf3eef097b6a3 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Fri, 13 Mar 2026 12:55:42 +0500 Subject: [PATCH 04/16] Extract submitted-jobs assignment phase --- .../scheduled_tasks/submitted_jobs.py | 180 +++++++++++------- 1 file changed, 107 insertions(+), 73 deletions(-) diff --git a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py index 11aa30c90..fbf4dc89b 100644 --- a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py +++ b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py @@ -290,83 +290,14 @@ async def _process_submitted_job( # First, the jobs gets an instance assigned (or no instance). # Then, the job runs on the assigned instance or a new instance is provisioned. # This is needed to avoid holding instances lock for a long time. - if not job_model.instance_assigned: - fleet_filters, instance_filters = await get_run_candidate_fleet_models_filters( - session=session, - project=project, - run_model=run_model, - run_spec=run_spec, - ) - ( - fleet_models_with_instances, - fleet_models_without_instances, - ) = await select_run_candidate_fleet_models_with_filters( + if not context.job_model.instance_assigned: + await _process_assignment_phase( + exit_stack=exit_stack, session=session, - fleet_filters=fleet_filters, - instance_filters=instance_filters, - lock_instances=True, - ) - instances_ids = sorted( - itertools.chain.from_iterable( - [i.id for i in f.instances] for f in fleet_models_with_instances - ) - ) - await sqlite_commit(session) - await exit_stack.enter_async_context( - get_locker(get_db().dialect_name).lock_ctx(InstanceModel.__tablename__, instances_ids) - ) - if is_db_sqlite(): - fleets_with_instances_ids = [f.id for f in fleet_models_with_instances] - fleet_models_with_instances = await _refetch_fleet_models_with_instances( - session=session, - fleets_ids=fleets_with_instances_ids, - instances_ids=instances_ids, - fleet_filters=fleet_filters, - instance_filters=instance_filters, - ) - fleet_models = fleet_models_with_instances + fleet_models_without_instances - fleet_model, fleet_instances_with_offers, _ = await find_optimal_fleet_with_offers( - project=project, - fleet_models=fleet_models, - run_model=run_model, - run_spec=run.run_spec, - job=job, + context=context, master_job_provisioning_data=master_job_provisioning_data, volumes=volumes, - exclude_not_available=True, ) - if fleet_model is None: - if run_spec.merged_profile.fleets is not None: - # Run cannot create new fleets when fleets are specified - logger.debug("%s: failed to use specified fleets", fmt(job_model)) - await _terminate_submitted_job( - session=session, - job_model=job_model, - reason=JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY, - message="Failed to use specified fleets", - ) - return - if not FeatureFlags.AUTOCREATED_FLEETS_ENABLED: - logger.debug("%s: no fleet found", fmt(job_model)) - # Note: `_get_job_status_message` relies on the "No fleet found" substring to return "no fleets" - await _terminate_submitted_job( - session=session, - job_model=job_model, - reason=JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY, - message=( - "No matching fleet found. Possible reasons: " - "https://dstack.ai/docs/guides/troubleshooting/#no-fleets" - ), - ) - return - instance = await _assign_job_to_fleet_instance( - session=session, - fleet_model=fleet_model, - instances_with_offers=fleet_instances_with_offers, - job_model=job_model, - multinode=context.multinode, - ) - await _mark_job_processed(session=session, job_model=job_model) return jobs_to_provision = _get_jobs_to_provision(job, context.replica_jobs, job_model) @@ -660,6 +591,109 @@ async def _prepare_job_volumes( ) +async def _process_assignment_phase( + exit_stack: AsyncExitStack, + session: AsyncSession, + context: _SubmittedJobContext, + master_job_provisioning_data: Optional[JobProvisioningData], + volumes: list[list[Volume]], +) -> None: + fleet_model, fleet_instances_with_offers = await _find_assignment_fleet_with_offers( + exit_stack=exit_stack, + session=session, + context=context, + master_job_provisioning_data=master_job_provisioning_data, + volumes=volumes, + ) + if fleet_model is None: + if context.run.run_spec.merged_profile.fleets is not None: + # Run cannot create new fleets when fleets are specified + logger.debug("%s: failed to use specified fleets", fmt(context.job_model)) + await _terminate_submitted_job( + session=session, + job_model=context.job_model, + reason=JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY, + message="Failed to use specified fleets", + ) + return + if not FeatureFlags.AUTOCREATED_FLEETS_ENABLED: + logger.debug("%s: no fleet found", fmt(context.job_model)) + # Note: `_get_job_status_message` relies on the "No fleet found" substring to return "no fleets" + await _terminate_submitted_job( + session=session, + job_model=context.job_model, + reason=JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY, + message=( + "No matching fleet found. Possible reasons: " + "https://dstack.ai/docs/guides/troubleshooting/#no-fleets" + ), + ) + return + + await _assign_job_to_fleet_instance( + session=session, + fleet_model=fleet_model, + instances_with_offers=fleet_instances_with_offers, + job_model=context.job_model, + multinode=context.multinode, + ) + await _mark_job_processed(session=session, job_model=context.job_model) + + +async def _find_assignment_fleet_with_offers( + exit_stack: AsyncExitStack, + session: AsyncSession, + context: _SubmittedJobContext, + master_job_provisioning_data: Optional[JobProvisioningData], + volumes: list[list[Volume]], +) -> tuple[Optional[FleetModel], list[tuple[InstanceModel, InstanceOfferWithAvailability]]]: + fleet_filters, instance_filters = await get_run_candidate_fleet_models_filters( + session=session, + project=context.project, + run_model=context.run_model, + run_spec=context.run.run_spec, + ) + ( + fleet_models_with_instances, + fleet_models_without_instances, + ) = await select_run_candidate_fleet_models_with_filters( + session=session, + fleet_filters=fleet_filters, + instance_filters=instance_filters, + lock_instances=True, + ) + instances_ids = sorted( + itertools.chain.from_iterable( + [i.id for i in f.instances] for f in fleet_models_with_instances + ) + ) + await sqlite_commit(session) + await exit_stack.enter_async_context( + get_locker(get_db().dialect_name).lock_ctx(InstanceModel.__tablename__, instances_ids) + ) + if is_db_sqlite(): + fleets_with_instances_ids = [f.id for f in fleet_models_with_instances] + fleet_models_with_instances = await _refetch_fleet_models_with_instances( + session=session, + fleets_ids=fleets_with_instances_ids, + instances_ids=instances_ids, + fleet_filters=fleet_filters, + instance_filters=instance_filters, + ) + fleet_models = fleet_models_with_instances + fleet_models_without_instances + fleet_model, fleet_instances_with_offers, _ = await find_optimal_fleet_with_offers( + project=context.project, + fleet_models=fleet_models, + run_model=context.run_model, + run_spec=context.run.run_spec, + job=context.job, + master_job_provisioning_data=master_job_provisioning_data, + volumes=volumes, + exclude_not_available=True, + ) + return fleet_model, fleet_instances_with_offers + + async def _defer_submitted_job( session: AsyncSession, job_model: JobModel, From 69521a3beb2678eeac89f0ec1559a1990b33f77e Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Fri, 13 Mar 2026 13:22:06 +0500 Subject: [PATCH 05/16] Extract submitted-jobs provisioning phase --- .../scheduled_tasks/submitted_jobs.py | 316 ++++++++++-------- 1 file changed, 178 insertions(+), 138 deletions(-) diff --git a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py index fbf4dc89b..2cabc01f4 100644 --- a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py +++ b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py @@ -241,6 +241,13 @@ class _PreparedJobVolumes: volumes: list[list[Volume]] +@dataclass +class _ProvisioningPhaseResult: + jobs_to_provision: list[Job] + instance: Optional[InstanceModel] + need_volume_attachment: bool + + async def _process_submitted_job( exit_stack: AsyncExitStack, session: AsyncSession, job_model: JobModel ): @@ -252,9 +259,7 @@ async def _process_submitted_job( project = context.project run = context.run job = context.job - fleet_model = context.fleet_model run_spec = run.run_spec - run_profile = run_spec.merged_profile master_job_dependency = await _resolve_master_job_dependency( session=session, @@ -300,145 +305,24 @@ async def _process_submitted_job( ) return - jobs_to_provision = _get_jobs_to_provision(job, context.replica_jobs, job_model) - # TODO: Volume attachment for compute groups is not yet supported since - # currently supported compute groups (e.g. Runpod) don't need explicit volume attachment. - need_volume_attachment = True - - if job_model.instance is not None: - res = await session.execute( - select(InstanceModel) - .where(InstanceModel.id == job_model.instance.id) - .options(selectinload(InstanceModel.volume_attachments)) - .execution_options(populate_existing=True) - ) - instance = res.unique().scalar_one() - switch_job_status(session, job_model, JobStatus.PROVISIONING) - else: - if run_profile.creation_policy == CreationPolicy.REUSE: - logger.debug("%s: reuse instance failed", fmt(job_model)) - await _terminate_submitted_job( - session=session, - job_model=job_model, - reason=JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY, - message="Could not reuse any instances for this job", - ) - return - - ( - fleet_model, - master_instance_provisioning_data, - ) = await _fetch_fleet_with_master_instance_provisioning_data( - exit_stack=exit_stack, - session=session, - fleet_model=fleet_model, - job=job, - ) - master_provisioning_data = ( - master_job_provisioning_data or master_instance_provisioning_data - ) - run_job_result = await _run_jobs_on_new_instances( - session=session, - project=project, - fleet_model=fleet_model, - job_model=job_model, - run=run, - jobs=jobs_to_provision, - project_ssh_public_key=project.ssh_public_key, - project_ssh_private_key=project.ssh_private_key, - master_job_provisioning_data=master_provisioning_data, - volumes=volumes, - ) - if run_job_result is None: - logger.debug("%s: provisioning failed", fmt(job_model)) - await _terminate_submitted_job( - session=session, - job_model=job_model, - reason=JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY, - ) - return - - if fleet_model is None: - fleet_model = await _create_fleet_model_for_job( - exit_stack=exit_stack, - session=session, - project=project, - run=run, - ) - session.add(fleet_model) - events.emit( - session, - f"Fleet created for job. Fleet status: {fleet_model.status.upper()}", - actor=events.SystemActor(), - targets=[ - events.Target.from_model(fleet_model), - events.Target.from_model(job_model), - ], - ) - - provisioning_data, offer, effective_profile, _ = run_job_result - compute_group_model = None - if isinstance(provisioning_data, ComputeGroupProvisioningData): - need_volume_attachment = False - provisioned_jobs = jobs_to_provision - jpds = provisioning_data.job_provisioning_datas - compute_group_model = ComputeGroupModel( - id=uuid.uuid4(), - project=project, - fleet=fleet_model, - status=ComputeGroupStatus.RUNNING, - provisioning_data=provisioning_data.json(), - ) - session.add(compute_group_model) - else: - provisioned_jobs = [job] - jpds = [provisioning_data] - - logger.info("%s: provisioned %s new instance(s)", fmt(job_model), len(provisioned_jobs)) - provisioned_job_models = _get_job_models_for_jobs(run_model.jobs, provisioned_jobs) - instance = None # Instance for attaching volumes in case of single job provisioned - # FIXME: Fleet is not locked which may lead to duplicate instance_num. - # This is currently hard to fix without locking the fleet for entire provisioning duration. - # Processing should be done in multiple steps so that - # InstanceModel is created before provisioning. - taken_instance_nums = await _get_taken_instance_nums(session, fleet_model) - for provisioned_job_model, jpd in zip(provisioned_job_models, jpds): - provisioned_job_model.job_provisioning_data = jpd.json() - switch_job_status(session, provisioned_job_model, JobStatus.PROVISIONING) - instance_num = get_next_instance_num(taken_instance_nums) - instance = _create_instance_model_for_job( - project=project, - fleet_model=fleet_model, - compute_group_model=compute_group_model, - job_model=provisioned_job_model, - job_provisioning_data=jpd, - offer=offer, - instance_num=instance_num, - profile=effective_profile, - ) - taken_instance_nums.add(instance_num) - provisioned_job_model.job_runtime_data = _prepare_job_runtime_data( - offer, context.multinode - ).json() - session.add(instance) - events.emit( - session, - f"Instance created for job. Instance status: {instance.status.upper()}", - actor=events.SystemActor(), - targets=[ - events.Target.from_model(instance), - events.Target.from_model(provisioned_job_model), - ], - ) - provisioned_job_model.used_instance_id = instance.id - provisioned_job_model.last_processed_at = common_utils.get_current_datetime() + provisioning_phase_res = await _process_provisioning_phase( + exit_stack=exit_stack, + session=session, + context=context, + master_job_provisioning_data=master_job_provisioning_data, + volumes=volumes, + ) + if provisioning_phase_res is None: + return _allow_other_replica_jobs_to_provision( - job_model, context.replica_job_models, jobs_to_provision + job_model, + replica_job_models=context.replica_job_models, + jobs_to_provision=provisioning_phase_res.jobs_to_provision, ) volumes_ids = sorted([v.id for vs in volume_models for v in vs]) - if need_volume_attachment: + if provisioning_phase_res.need_volume_attachment: # Take lock to prevent attaching volumes that are to be deleted. # If the volume was deleted before the lock, the volume will fail to attach and the job will fail. # TODO: Lock instances for attaching volumes? @@ -453,12 +337,12 @@ async def _process_submitted_job( get_locker(get_db().dialect_name).lock_ctx(VolumeModel.__tablename__, volumes_ids) ) if len(volume_models) > 0: - assert instance is not None + assert provisioning_phase_res.instance is not None await _attach_volumes( session=session, project=project, job_model=job_model, - instance=instance, + instance=provisioning_phase_res.instance, volume_models=volume_models, ) await session.commit() @@ -694,6 +578,162 @@ async def _find_assignment_fleet_with_offers( return fleet_model, fleet_instances_with_offers +async def _process_provisioning_phase( + exit_stack: AsyncExitStack, + session: AsyncSession, + context: _SubmittedJobContext, + master_job_provisioning_data: Optional[JobProvisioningData], + volumes: list[list[Volume]], +) -> Optional[_ProvisioningPhaseResult]: + job_model = context.job_model + run = context.run + job = context.job + jobs_to_provision = _get_jobs_to_provision(job, context.replica_jobs, job_model) + # TODO: Volume attachment for compute groups is not yet supported since + # currently supported compute groups (e.g. Runpod) don't need explicit volume attachment. + need_volume_attachment = True + + if job_model.instance is not None: + res = await session.execute( + select(InstanceModel) + .where(InstanceModel.id == job_model.instance.id) + .options(selectinload(InstanceModel.volume_attachments)) + .execution_options(populate_existing=True) + ) + instance = res.unique().scalar_one() + switch_job_status(session, job_model, JobStatus.PROVISIONING) + return _ProvisioningPhaseResult( + jobs_to_provision=jobs_to_provision, + instance=instance, + need_volume_attachment=need_volume_attachment, + ) + + if run.run_spec.merged_profile.creation_policy == CreationPolicy.REUSE: + logger.debug("%s: reuse instance failed", fmt(job_model)) + await _terminate_submitted_job( + session=session, + job_model=job_model, + reason=JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY, + message="Could not reuse any instances for this job", + ) + return None + + ( + fleet_model, + master_instance_provisioning_data, + ) = await _fetch_fleet_with_master_instance_provisioning_data( + exit_stack=exit_stack, + session=session, + fleet_model=context.fleet_model, + job=job, + ) + + # master_job_provisioning_data is present if there is a master job. + # master_instance_provisioning_data is present if there is a master instance (non empty cluster fleet). + master_provisioning_data = master_job_provisioning_data or master_instance_provisioning_data + run_job_result = await _run_jobs_on_new_instances( + session=session, + project=context.project, + fleet_model=fleet_model, + job_model=job_model, + run=run, + jobs=jobs_to_provision, + project_ssh_public_key=context.project.ssh_public_key, + project_ssh_private_key=context.project.ssh_private_key, + master_job_provisioning_data=master_provisioning_data, + volumes=volumes, + ) + if run_job_result is None: + logger.debug("%s: provisioning failed", fmt(job_model)) + await _terminate_submitted_job( + session=session, + job_model=job_model, + reason=JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY, + ) + return None + + if fleet_model is None: + fleet_model = await _create_fleet_model_for_job( + exit_stack=exit_stack, + session=session, + project=context.project, + run=run, + ) + session.add(fleet_model) + events.emit( + session, + f"Fleet created for job. Fleet status: {fleet_model.status.upper()}", + actor=events.SystemActor(), + targets=[ + events.Target.from_model(fleet_model), + events.Target.from_model(job_model), + ], + ) + + provisioning_data, offer, effective_profile, _ = run_job_result + compute_group_model = None + if isinstance(provisioning_data, ComputeGroupProvisioningData): + need_volume_attachment = False + provisioned_jobs = jobs_to_provision + jpds = provisioning_data.job_provisioning_datas + compute_group_model = ComputeGroupModel( + id=uuid.uuid4(), + project=context.project, + fleet=fleet_model, + status=ComputeGroupStatus.RUNNING, + provisioning_data=provisioning_data.json(), + ) + session.add(compute_group_model) + else: + provisioned_jobs = [job] + jpds = [provisioning_data] + + logger.info("%s: provisioned %s new instance(s)", fmt(job_model), len(provisioned_jobs)) + provisioned_job_models = _get_job_models_for_jobs(context.run_model.jobs, provisioned_jobs) + instance = None # Instance for attaching volumes in case of single job provisioned + # FIXME: Fleet is not locked which may lead to duplicate instance_num. + # This is currently hard to fix without locking the fleet for entire provisioning duration. + # Processing should be done in multiple steps so that + # InstanceModel is created before provisioning. + taken_instance_nums = await _get_taken_instance_nums(session, fleet_model) + for provisioned_job_model, jpd in zip(provisioned_job_models, jpds): + provisioned_job_model.job_provisioning_data = jpd.json() + switch_job_status(session, provisioned_job_model, JobStatus.PROVISIONING) + instance_num = get_next_instance_num(taken_instance_nums) + instance = _create_instance_model_for_job( + project=context.project, + fleet_model=fleet_model, + compute_group_model=compute_group_model, + job_model=provisioned_job_model, + job_provisioning_data=jpd, + offer=offer, + instance_num=instance_num, + profile=effective_profile, + ) + taken_instance_nums.add(instance_num) + provisioned_job_model.job_runtime_data = _prepare_job_runtime_data( + offer, context.multinode + ).json() + session.add(instance) + events.emit( + session, + f"Instance created for job. Instance status: {instance.status.upper()}", + actor=events.SystemActor(), + targets=[ + events.Target.from_model(instance), + events.Target.from_model(provisioned_job_model), + ], + ) + provisioned_job_model.used_instance_id = instance.id + provisioned_job_model.last_processed_at = common_utils.get_current_datetime() + + return _ProvisioningPhaseResult( + jobs_to_provision=jobs_to_provision, + instance=instance, + need_volume_attachment=need_volume_attachment, + ) + + async def _defer_submitted_job( session: AsyncSession, job_model: JobModel, From 0f41ddbe9c91cf08b315c64fd241c080153117e4 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Fri, 13 Mar 2026 13:27:43 +0500 Subject: [PATCH 06/16] Rename submitted-jobs provisioning helpers --- .../scheduled_tasks/submitted_jobs.py | 68 +++++++++++-------- 1 file changed, 39 insertions(+), 29 deletions(-) diff --git a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py index 2cabc01f4..388083db2 100644 --- a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py +++ b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py @@ -51,7 +51,6 @@ JobRuntimeData, JobStatus, JobTerminationReason, - Requirements, Run, ) from dstack._internal.core.models.volumes import Volume @@ -248,6 +247,13 @@ class _ProvisioningPhaseResult: need_volume_attachment: bool +@dataclass +class _ProvisionNewCapacityResult: + provisioning_data: Union[JobProvisioningData, ComputeGroupProvisioningData] + offer: InstanceOfferWithAvailability + effective_profile: Profile + + async def _process_submitted_job( exit_stack: AsyncExitStack, session: AsyncSession, job_model: JobModel ): @@ -305,24 +311,24 @@ async def _process_submitted_job( ) return - provisioning_phase_res = await _process_provisioning_phase( + provisioning_phase_result = await _process_provisioning_phase( exit_stack=exit_stack, session=session, context=context, master_job_provisioning_data=master_job_provisioning_data, volumes=volumes, ) - if provisioning_phase_res is None: + if provisioning_phase_result is None: return - _allow_other_replica_jobs_to_provision( + _release_replica_jobs_from_master_wait( job_model, replica_job_models=context.replica_job_models, - jobs_to_provision=provisioning_phase_res.jobs_to_provision, + jobs_to_provision=provisioning_phase_result.jobs_to_provision, ) volumes_ids = sorted([v.id for vs in volume_models for v in vs]) - if provisioning_phase_res.need_volume_attachment: + if provisioning_phase_result.need_volume_attachment: # Take lock to prevent attaching volumes that are to be deleted. # If the volume was deleted before the lock, the volume will fail to attach and the job will fail. # TODO: Lock instances for attaching volumes? @@ -337,12 +343,12 @@ async def _process_submitted_job( get_locker(get_db().dialect_name).lock_ctx(VolumeModel.__tablename__, volumes_ids) ) if len(volume_models) > 0: - assert provisioning_phase_res.instance is not None + assert provisioning_phase_result.instance is not None await _attach_volumes( session=session, project=project, job_model=job_model, - instance=provisioning_phase_res.instance, + instance=provisioning_phase_result.instance, volume_models=volume_models, ) await session.commit() @@ -514,7 +520,7 @@ async def _process_assignment_phase( ) return - await _assign_job_to_fleet_instance( + await _assign_existing_instance_to_job( session=session, fleet_model=fleet_model, instances_with_offers=fleet_instances_with_offers, @@ -588,7 +594,7 @@ async def _process_provisioning_phase( job_model = context.job_model run = context.run job = context.job - jobs_to_provision = _get_jobs_to_provision(job, context.replica_jobs, job_model) + jobs_to_provision = _select_jobs_to_provision(job, context.replica_jobs, job_model) # TODO: Volume attachment for compute groups is not yet supported since # currently supported compute groups (e.g. Runpod) don't need explicit volume attachment. need_volume_attachment = True @@ -621,7 +627,7 @@ async def _process_provisioning_phase( ( fleet_model, master_instance_provisioning_data, - ) = await _fetch_fleet_with_master_instance_provisioning_data( + ) = await _lock_fleet_and_get_master_provisioning_data( exit_stack=exit_stack, session=session, fleet_model=context.fleet_model, @@ -631,7 +637,7 @@ async def _process_provisioning_phase( # master_job_provisioning_data is present if there is a master job. # master_instance_provisioning_data is present if there is a master instance (non empty cluster fleet). master_provisioning_data = master_job_provisioning_data or master_instance_provisioning_data - run_job_result = await _run_jobs_on_new_instances( + provision_new_capacity_result = await _provision_new_capacity( session=session, project=context.project, fleet_model=fleet_model, @@ -643,7 +649,7 @@ async def _process_provisioning_phase( master_job_provisioning_data=master_provisioning_data, volumes=volumes, ) - if run_job_result is None: + if provision_new_capacity_result is None: logger.debug("%s: provisioning failed", fmt(job_model)) await _terminate_submitted_job( session=session, @@ -652,6 +658,7 @@ async def _process_provisioning_phase( ) return None + # TODO: Drop once autocreated fleets are dropped. if fleet_model is None: fleet_model = await _create_fleet_model_for_job( exit_stack=exit_stack, @@ -670,7 +677,9 @@ async def _process_provisioning_phase( ], ) - provisioning_data, offer, effective_profile, _ = run_job_result + provisioning_data = provision_new_capacity_result.provisioning_data + offer = provision_new_capacity_result.offer + effective_profile = provision_new_capacity_result.effective_profile compute_group_model = None if isinstance(provisioning_data, ComputeGroupProvisioningData): need_volume_attachment = False @@ -785,7 +794,7 @@ async def _refetch_fleet_models_with_instances( return fleet_models -async def _fetch_fleet_with_master_instance_provisioning_data( +async def _lock_fleet_and_get_master_provisioning_data( exit_stack: AsyncExitStack, session: AsyncSession, fleet_model: Optional[FleetModel], @@ -845,7 +854,7 @@ async def _fetch_fleet_with_master_instance_provisioning_data( return fleet_model, master_instance_provisioning_data -async def _assign_job_to_fleet_instance( +async def _assign_existing_instance_to_job( session: AsyncSession, fleet_model: Optional[FleetModel], job_model: JobModel, @@ -888,7 +897,7 @@ async def _assign_job_to_fleet_instance( return instance -def _get_jobs_to_provision(job: Job, replica_jobs: list[Job], job_model: JobModel) -> list[Job]: +def _select_jobs_to_provision(job: Job, replica_jobs: list[Job], job_model: JobModel) -> list[Job]: """ Returns the passed job for non-master jobs and all replica jobs for master jobs in multinode setups. """ @@ -905,7 +914,7 @@ def _get_jobs_to_provision(job: Job, replica_jobs: list[Job], job_model: JobMode return jobs_to_provision -def _allow_other_replica_jobs_to_provision( +def _release_replica_jobs_from_master_wait( job_model: JobModel, replica_job_models: list[JobModel], jobs_to_provision: list[Job], @@ -916,7 +925,7 @@ def _allow_other_replica_jobs_to_provision( replica_job_model.waiting_master_job = False -async def _run_jobs_on_new_instances( +async def _provision_new_capacity( session: AsyncSession, project: ProjectModel, job_model: JobModel, @@ -927,14 +936,7 @@ async def _run_jobs_on_new_instances( master_job_provisioning_data: Optional[JobProvisioningData] = None, volumes: Optional[list[list[Volume]]] = None, fleet_model: Optional[FleetModel] = None, -) -> Optional[ - tuple[ - Union[JobProvisioningData, ComputeGroupProvisioningData], - InstanceOfferWithAvailability, - Profile, - Requirements, - ] -]: +) -> Optional[_ProvisionNewCapacityResult]: """ Provisions an instance for a job or a compute group for multiple jobs and runs the jobs. Even when multiple jobs are passes, it may still provision only one instance @@ -1037,7 +1039,11 @@ async def _run_jobs_on_new_instances( project_ssh_private_key, placement_group_model_to_placement_group_optional(placement_group_model), ) - return cgpd, offer, profile, requirements + return _ProvisionNewCapacityResult( + provisioning_data=cgpd, + offer=offer, + effective_profile=profile, + ) else: jpd = await common_utils.run_async( compute.run_job, @@ -1049,7 +1055,11 @@ async def _run_jobs_on_new_instances( offer_volumes, placement_group_model_to_placement_group_optional(placement_group_model), ) - return jpd, offer, profile, requirements + return _ProvisionNewCapacityResult( + provisioning_data=jpd, + offer=offer, + effective_profile=profile, + ) except BackendError as e: logger.warning( "%s: %s launch in %s/%s failed: %s", From d33f64694c014f3d1702ffb54c8d54ac9179f698 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Fri, 13 Mar 2026 13:32:41 +0500 Subject: [PATCH 07/16] Extract submitted-jobs finalization --- .../scheduled_tasks/submitted_jobs.py | 86 +++++++++++-------- 1 file changed, 50 insertions(+), 36 deletions(-) diff --git a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py index 388083db2..d3c1a152f 100644 --- a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py +++ b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py @@ -262,7 +262,6 @@ async def _process_submitted_job( job_model = context.job_model run_model = context.run_model - project = context.project run = context.run job = context.job run_spec = run.run_spec @@ -288,14 +287,12 @@ async def _process_submitted_job( prepared_job_volumes = await _prepare_job_volumes( session=session, job_model=job_model, - project=project, + project=context.project, run_spec=run_spec, job=job, ) if prepared_job_volumes is None: return - volume_models = prepared_job_volumes.volume_models - volumes = prepared_job_volumes.volumes # Submitted jobs processing happens in two steps (transactions). # First, the jobs gets an instance assigned (or no instance). @@ -307,7 +304,7 @@ async def _process_submitted_job( session=session, context=context, master_job_provisioning_data=master_job_provisioning_data, - volumes=volumes, + volumes=prepared_job_volumes.volumes, ) return @@ -316,43 +313,19 @@ async def _process_submitted_job( session=session, context=context, master_job_provisioning_data=master_job_provisioning_data, - volumes=volumes, + volumes=prepared_job_volumes.volumes, ) if provisioning_phase_result is None: return - _release_replica_jobs_from_master_wait( - job_model, - replica_job_models=context.replica_job_models, - jobs_to_provision=provisioning_phase_result.jobs_to_provision, + await _finalize_submitted_job_processing( + exit_stack=exit_stack, + session=session, + context=context, + prepared_job_volumes=prepared_job_volumes, + provisioning_phase_result=provisioning_phase_result, ) - volumes_ids = sorted([v.id for vs in volume_models for v in vs]) - if provisioning_phase_result.need_volume_attachment: - # Take lock to prevent attaching volumes that are to be deleted. - # If the volume was deleted before the lock, the volume will fail to attach and the job will fail. - # TODO: Lock instances for attaching volumes? - await session.execute( - select(VolumeModel) - .where(VolumeModel.id.in_(volumes_ids)) - .options(joinedload(VolumeModel.user).load_only(UserModel.name)) - .order_by(VolumeModel.id) # take locks in order - .with_for_update(key_share=True, of=VolumeModel) - ) - await exit_stack.enter_async_context( - get_locker(get_db().dialect_name).lock_ctx(VolumeModel.__tablename__, volumes_ids) - ) - if len(volume_models) > 0: - assert provisioning_phase_result.instance is not None - await _attach_volumes( - session=session, - project=project, - job_model=job_model, - instance=provisioning_phase_result.instance, - volume_models=volume_models, - ) - await session.commit() - async def _load_submitted_job_context( session: AsyncSession, job_model: JobModel @@ -743,6 +716,47 @@ async def _process_provisioning_phase( ) +async def _finalize_submitted_job_processing( + exit_stack: AsyncExitStack, + session: AsyncSession, + context: _SubmittedJobContext, + prepared_job_volumes: _PreparedJobVolumes, + provisioning_phase_result: _ProvisioningPhaseResult, +) -> None: + _release_replica_jobs_from_master_wait( + context.job_model, + replica_job_models=context.replica_job_models, + jobs_to_provision=provisioning_phase_result.jobs_to_provision, + ) + + volume_models = prepared_job_volumes.volume_models + volumes_ids = sorted([v.id for vs in volume_models for v in vs]) + if provisioning_phase_result.need_volume_attachment: + # Take lock to prevent attaching volumes that are to be deleted. + # If the volume was deleted before the lock, the volume will fail to attach and the job will fail. + # TODO: Lock instances for attaching volumes? + await session.execute( + select(VolumeModel) + .where(VolumeModel.id.in_(volumes_ids)) + .options(joinedload(VolumeModel.user).load_only(UserModel.name)) + .order_by(VolumeModel.id) # take locks in order + .with_for_update(key_share=True, of=VolumeModel) + ) + await exit_stack.enter_async_context( + get_locker(get_db().dialect_name).lock_ctx(VolumeModel.__tablename__, volumes_ids) + ) + if len(volume_models) > 0: + assert provisioning_phase_result.instance is not None + await _attach_volumes( + session=session, + project=context.project, + job_model=context.job_model, + instance=provisioning_phase_result.instance, + volume_models=volume_models, + ) + await session.commit() + + async def _defer_submitted_job( session: AsyncSession, job_model: JobModel, From ed9872673a63178a0787ddcec2b7d7dcafa011ee Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Fri, 13 Mar 2026 14:25:03 +0500 Subject: [PATCH 08/16] Clarify submitted-jobs provisioning outputs --- .../scheduled_tasks/submitted_jobs.py | 130 ++++++++++++------ 1 file changed, 89 insertions(+), 41 deletions(-) diff --git a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py index d3c1a152f..9b3aa713f 100644 --- a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py +++ b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py @@ -243,8 +243,8 @@ class _PreparedJobVolumes: @dataclass class _ProvisioningPhaseResult: jobs_to_provision: list[Job] - instance: Optional[InstanceModel] - need_volume_attachment: bool + instance_models: list[InstanceModel] + compute_group_model: Optional[ComputeGroupModel] @dataclass @@ -564,39 +564,65 @@ async def _process_provisioning_phase( master_job_provisioning_data: Optional[JobProvisioningData], volumes: list[list[Volume]], ) -> Optional[_ProvisioningPhaseResult]: - job_model = context.job_model - run = context.run - job = context.job - jobs_to_provision = _select_jobs_to_provision(job, context.replica_jobs, job_model) - # TODO: Volume attachment for compute groups is not yet supported since - # currently supported compute groups (e.g. Runpod) don't need explicit volume attachment. - need_volume_attachment = True - - if job_model.instance is not None: - res = await session.execute( - select(InstanceModel) - .where(InstanceModel.id == job_model.instance.id) - .options(selectinload(InstanceModel.volume_attachments)) - .execution_options(populate_existing=True) - ) - instance = res.unique().scalar_one() - switch_job_status(session, job_model, JobStatus.PROVISIONING) - return _ProvisioningPhaseResult( + jobs_to_provision = _select_jobs_to_provision( + context.job, context.replica_jobs, context.job_model + ) + if context.job_model.instance is not None: + return await _process_existing_instance_provisioning_path( + session=session, + job_model=context.job_model, jobs_to_provision=jobs_to_provision, - instance=instance, - need_volume_attachment=need_volume_attachment, ) - if run.run_spec.merged_profile.creation_policy == CreationPolicy.REUSE: - logger.debug("%s: reuse instance failed", fmt(job_model)) + if context.run.run_spec.merged_profile.creation_policy == CreationPolicy.REUSE: + logger.debug("%s: reuse instance failed", fmt(context.job_model)) await _terminate_submitted_job( session=session, - job_model=job_model, + job_model=context.job_model, reason=JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY, message="Could not reuse any instances for this job", ) return None + return await _process_new_capacity_provisioning_path( + exit_stack=exit_stack, + session=session, + context=context, + jobs_to_provision=jobs_to_provision, + master_job_provisioning_data=master_job_provisioning_data, + volumes=volumes, + ) + + +async def _process_existing_instance_provisioning_path( + session: AsyncSession, + job_model: JobModel, + jobs_to_provision: list[Job], +) -> _ProvisioningPhaseResult: + assert job_model.instance is not None + res = await session.execute( + select(InstanceModel) + .where(InstanceModel.id == job_model.instance.id) + .options(selectinload(InstanceModel.volume_attachments)) + .execution_options(populate_existing=True) + ) + instance = res.unique().scalar_one() + switch_job_status(session, job_model, JobStatus.PROVISIONING) + return _ProvisioningPhaseResult( + jobs_to_provision=jobs_to_provision, + instance_models=[instance], + compute_group_model=None, + ) + + +async def _process_new_capacity_provisioning_path( + exit_stack: AsyncExitStack, + session: AsyncSession, + context: _SubmittedJobContext, + jobs_to_provision: list[Job], + master_job_provisioning_data: Optional[JobProvisioningData], + volumes: list[list[Volume]], +) -> Optional[_ProvisioningPhaseResult]: ( fleet_model, master_instance_provisioning_data, @@ -604,7 +630,7 @@ async def _process_provisioning_phase( exit_stack=exit_stack, session=session, fleet_model=context.fleet_model, - job=job, + job=context.job, ) # master_job_provisioning_data is present if there is a master job. @@ -614,8 +640,8 @@ async def _process_provisioning_phase( session=session, project=context.project, fleet_model=fleet_model, - job_model=job_model, - run=run, + job_model=context.job_model, + run=context.run, jobs=jobs_to_provision, project_ssh_public_key=context.project.ssh_public_key, project_ssh_private_key=context.project.ssh_private_key, @@ -623,10 +649,10 @@ async def _process_provisioning_phase( volumes=volumes, ) if provision_new_capacity_result is None: - logger.debug("%s: provisioning failed", fmt(job_model)) + logger.debug("%s: provisioning failed", fmt(context.job_model)) await _terminate_submitted_job( session=session, - job_model=job_model, + job_model=context.job_model, reason=JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY, ) return None @@ -637,7 +663,7 @@ async def _process_provisioning_phase( exit_stack=exit_stack, session=session, project=context.project, - run=run, + run=context.run, ) session.add(fleet_model) events.emit( @@ -646,16 +672,31 @@ async def _process_provisioning_phase( actor=events.SystemActor(), targets=[ events.Target.from_model(fleet_model), - events.Target.from_model(job_model), + events.Target.from_model(context.job_model), ], ) + return await _materialize_newly_provisioned_capacity( + session=session, + context=context, + jobs_to_provision=jobs_to_provision, + fleet_model=fleet_model, + provision_new_capacity_result=provision_new_capacity_result, + ) + + +async def _materialize_newly_provisioned_capacity( + session: AsyncSession, + context: _SubmittedJobContext, + jobs_to_provision: list[Job], + fleet_model: FleetModel, + provision_new_capacity_result: _ProvisionNewCapacityResult, +) -> _ProvisioningPhaseResult: provisioning_data = provision_new_capacity_result.provisioning_data offer = provision_new_capacity_result.offer effective_profile = provision_new_capacity_result.effective_profile compute_group_model = None if isinstance(provisioning_data, ComputeGroupProvisioningData): - need_volume_attachment = False provisioned_jobs = jobs_to_provision jpds = provisioning_data.job_provisioning_datas compute_group_model = ComputeGroupModel( @@ -667,12 +708,16 @@ async def _process_provisioning_phase( ) session.add(compute_group_model) else: - provisioned_jobs = [job] + provisioned_jobs = [context.job] jpds = [provisioning_data] - logger.info("%s: provisioned %s new instance(s)", fmt(job_model), len(provisioned_jobs)) + logger.info( + "%s: provisioned %s new instance(s)", + fmt(context.job_model), + len(provisioned_jobs), + ) provisioned_job_models = _get_job_models_for_jobs(context.run_model.jobs, provisioned_jobs) - instance = None # Instance for attaching volumes in case of single job provisioned + instances: list[InstanceModel] = [] # FIXME: Fleet is not locked which may lead to duplicate instance_num. # This is currently hard to fix without locking the fleet for entire provisioning duration. # Processing should be done in multiple steps so that @@ -692,6 +737,7 @@ async def _process_provisioning_phase( instance_num=instance_num, profile=effective_profile, ) + instances.append(instance) taken_instance_nums.add(instance_num) provisioned_job_model.job_runtime_data = _prepare_job_runtime_data( offer, context.multinode @@ -711,8 +757,8 @@ async def _process_provisioning_phase( return _ProvisioningPhaseResult( jobs_to_provision=jobs_to_provision, - instance=instance, - need_volume_attachment=need_volume_attachment, + instance_models=instances, + compute_group_model=compute_group_model, ) @@ -731,7 +777,9 @@ async def _finalize_submitted_job_processing( volume_models = prepared_job_volumes.volume_models volumes_ids = sorted([v.id for vs in volume_models for v in vs]) - if provisioning_phase_result.need_volume_attachment: + # TODO: Volume attachment for compute groups is not yet supported since + # currently supported compute groups (e.g. Runpod) don't need explicit volume attachment. + if provisioning_phase_result.compute_group_model is None: # Take lock to prevent attaching volumes that are to be deleted. # If the volume was deleted before the lock, the volume will fail to attach and the job will fail. # TODO: Lock instances for attaching volumes? @@ -746,12 +794,12 @@ async def _finalize_submitted_job_processing( get_locker(get_db().dialect_name).lock_ctx(VolumeModel.__tablename__, volumes_ids) ) if len(volume_models) > 0: - assert provisioning_phase_result.instance is not None + assert len(provisioning_phase_result.instance_models) == 1 await _attach_volumes( session=session, project=context.project, job_model=context.job_model, - instance=provisioning_phase_result.instance, + instance=provisioning_phase_result.instance_models[0], volume_models=volume_models, ) await session.commit() From 2dbd44d28d6616c17aadec4611c1fa9c2b07cdb6 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Fri, 13 Mar 2026 14:46:41 +0500 Subject: [PATCH 09/16] Extract submitted-jobs volume attachment --- .../scheduled_tasks/submitted_jobs.py | 95 +++++++++++-------- 1 file changed, 54 insertions(+), 41 deletions(-) diff --git a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py index 9b3aa713f..6e0146143 100644 --- a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py +++ b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py @@ -260,17 +260,11 @@ async def _process_submitted_job( context = await _load_submitted_job_context(session=session, job_model=job_model) logger.debug("%s: provisioning has started", fmt(context.job_model)) - job_model = context.job_model - run_model = context.run_model - run = context.run - job = context.job - run_spec = run.run_spec - master_job_dependency = await _resolve_master_job_dependency( session=session, - job_model=job_model, - run=run, - job=job, + job_model=context.job_model, + run=context.run, + job=context.job, ) if master_job_dependency is None: return @@ -278,18 +272,18 @@ async def _process_submitted_job( if not await _resolve_fleet_dependency( session=session, - job_model=job_model, - run_model=run_model, - job=job, + job_model=context.job_model, + run_model=context.run_model, + job=context.job, ): return prepared_job_volumes = await _prepare_job_volumes( session=session, - job_model=job_model, + job_model=context.job_model, project=context.project, - run_spec=run_spec, - job=job, + run_spec=context.run.run_spec, + job=context.job, ) if prepared_job_volumes is None: return @@ -775,34 +769,53 @@ async def _finalize_submitted_job_processing( jobs_to_provision=provisioning_phase_result.jobs_to_provision, ) - volume_models = prepared_job_volumes.volume_models - volumes_ids = sorted([v.id for vs in volume_models for v in vs]) + await _attach_job_volumes_if_needed( + exit_stack=exit_stack, + session=session, + context=context, + prepared_job_volumes=prepared_job_volumes, + provisioning_phase_result=provisioning_phase_result, + ) + await session.commit() + + +async def _attach_job_volumes_if_needed( + exit_stack: AsyncExitStack, + session: AsyncSession, + context: _SubmittedJobContext, + prepared_job_volumes: _PreparedJobVolumes, + provisioning_phase_result: _ProvisioningPhaseResult, +) -> None: # TODO: Volume attachment for compute groups is not yet supported since # currently supported compute groups (e.g. Runpod) don't need explicit volume attachment. - if provisioning_phase_result.compute_group_model is None: - # Take lock to prevent attaching volumes that are to be deleted. - # If the volume was deleted before the lock, the volume will fail to attach and the job will fail. - # TODO: Lock instances for attaching volumes? - await session.execute( - select(VolumeModel) - .where(VolumeModel.id.in_(volumes_ids)) - .options(joinedload(VolumeModel.user).load_only(UserModel.name)) - .order_by(VolumeModel.id) # take locks in order - .with_for_update(key_share=True, of=VolumeModel) - ) - await exit_stack.enter_async_context( - get_locker(get_db().dialect_name).lock_ctx(VolumeModel.__tablename__, volumes_ids) - ) - if len(volume_models) > 0: - assert len(provisioning_phase_result.instance_models) == 1 - await _attach_volumes( - session=session, - project=context.project, - job_model=context.job_model, - instance=provisioning_phase_result.instance_models[0], - volume_models=volume_models, - ) - await session.commit() + if provisioning_phase_result.compute_group_model is not None: + return + + volume_models = prepared_job_volumes.volume_models + volumes_ids = sorted([v.id for vs in volume_models for v in vs]) + # Take lock to prevent attaching volumes that are to be deleted. + # If the volume was deleted before the lock, the volume will fail to attach and the job will fail. + # TODO: Lock instances for attaching volumes? + await session.execute( + select(VolumeModel) + .where(VolumeModel.id.in_(volumes_ids)) + .options(joinedload(VolumeModel.user).load_only(UserModel.name)) + .order_by(VolumeModel.id) # take locks in order + .with_for_update(key_share=True, of=VolumeModel) + ) + await exit_stack.enter_async_context( + get_locker(get_db().dialect_name).lock_ctx(VolumeModel.__tablename__, volumes_ids) + ) + if len(volume_models) == 0: + return + assert len(provisioning_phase_result.instance_models) == 1 + await _attach_volumes( + session=session, + project=context.project, + job_model=context.job_model, + instance=provisioning_phase_result.instance_models[0], + volume_models=volume_models, + ) async def _defer_submitted_job( From 0ac00c55a0ec5d88c73974107ca48aa423b46565 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Fri, 13 Mar 2026 15:08:46 +0500 Subject: [PATCH 10/16] Extract submitted-jobs instance materialization --- .../scheduled_tasks/submitted_jobs.py | 102 ++++++++++++------ 1 file changed, 72 insertions(+), 30 deletions(-) diff --git a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py index 6e0146143..3930f0b52 100644 --- a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py +++ b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py @@ -686,13 +686,49 @@ async def _materialize_newly_provisioned_capacity( fleet_model: FleetModel, provision_new_capacity_result: _ProvisionNewCapacityResult, ) -> _ProvisioningPhaseResult: - provisioning_data = provision_new_capacity_result.provisioning_data - offer = provision_new_capacity_result.offer - effective_profile = provision_new_capacity_result.effective_profile - compute_group_model = None + ( + provisioned_jobs, + job_provisioning_datas, + compute_group_model, + ) = _resolve_provisioned_jobs_and_data( + session=session, + context=context, + jobs_to_provision=jobs_to_provision, + fleet_model=fleet_model, + provisioning_data=provision_new_capacity_result.provisioning_data, + ) + + instance_models = await _create_instance_models_for_provisioned_jobs( + session=session, + context=context, + fleet_model=fleet_model, + compute_group_model=compute_group_model, + provisioned_jobs=provisioned_jobs, + job_provisioning_datas=job_provisioning_datas, + offer=provision_new_capacity_result.offer, + effective_profile=provision_new_capacity_result.effective_profile, + ) + + logger.info( + "%s: provisioned %s new instance(s)", + fmt(context.job_model), + len(provisioned_jobs), + ) + return _ProvisioningPhaseResult( + jobs_to_provision=jobs_to_provision, + instance_models=instance_models, + compute_group_model=compute_group_model, + ) + + +def _resolve_provisioned_jobs_and_data( + session: AsyncSession, + context: _SubmittedJobContext, + jobs_to_provision: list[Job], + fleet_model: FleetModel, + provisioning_data: Union[JobProvisioningData, ComputeGroupProvisioningData], +) -> tuple[list[Job], list[JobProvisioningData], Optional[ComputeGroupModel]]: if isinstance(provisioning_data, ComputeGroupProvisioningData): - provisioned_jobs = jobs_to_provision - jpds = provisioning_data.job_provisioning_datas compute_group_model = ComputeGroupModel( id=uuid.uuid4(), project=context.project, @@ -701,59 +737,65 @@ async def _materialize_newly_provisioned_capacity( provisioning_data=provisioning_data.json(), ) session.add(compute_group_model) - else: - provisioned_jobs = [context.job] - jpds = [provisioning_data] + return ( + jobs_to_provision, + provisioning_data.job_provisioning_datas, + compute_group_model, + ) + return [context.job], [provisioning_data], None - logger.info( - "%s: provisioned %s new instance(s)", - fmt(context.job_model), - len(provisioned_jobs), - ) + +async def _create_instance_models_for_provisioned_jobs( + session: AsyncSession, + context: _SubmittedJobContext, + fleet_model: FleetModel, + compute_group_model: Optional[ComputeGroupModel], + provisioned_jobs: list[Job], + job_provisioning_datas: list[JobProvisioningData], + offer: InstanceOfferWithAvailability, + effective_profile: Profile, +) -> list[InstanceModel]: provisioned_job_models = _get_job_models_for_jobs(context.run_model.jobs, provisioned_jobs) - instances: list[InstanceModel] = [] + instance_models: list[InstanceModel] = [] # FIXME: Fleet is not locked which may lead to duplicate instance_num. # This is currently hard to fix without locking the fleet for entire provisioning duration. # Processing should be done in multiple steps so that # InstanceModel is created before provisioning. taken_instance_nums = await _get_taken_instance_nums(session, fleet_model) - for provisioned_job_model, jpd in zip(provisioned_job_models, jpds): - provisioned_job_model.job_provisioning_data = jpd.json() + for provisioned_job_model, job_provisioning_data in zip( + provisioned_job_models, job_provisioning_datas + ): + provisioned_job_model.job_provisioning_data = job_provisioning_data.json() switch_job_status(session, provisioned_job_model, JobStatus.PROVISIONING) instance_num = get_next_instance_num(taken_instance_nums) - instance = _create_instance_model_for_job( + instance_model = _create_instance_model_for_job( project=context.project, fleet_model=fleet_model, compute_group_model=compute_group_model, job_model=provisioned_job_model, - job_provisioning_data=jpd, + job_provisioning_data=job_provisioning_data, offer=offer, instance_num=instance_num, profile=effective_profile, ) - instances.append(instance) + instance_models.append(instance_model) taken_instance_nums.add(instance_num) provisioned_job_model.job_runtime_data = _prepare_job_runtime_data( offer, context.multinode ).json() - session.add(instance) + session.add(instance_model) events.emit( session, - f"Instance created for job. Instance status: {instance.status.upper()}", + f"Instance created for job. Instance status: {instance_model.status.upper()}", actor=events.SystemActor(), targets=[ - events.Target.from_model(instance), + events.Target.from_model(instance_model), events.Target.from_model(provisioned_job_model), ], ) - provisioned_job_model.used_instance_id = instance.id + provisioned_job_model.used_instance_id = instance_model.id provisioned_job_model.last_processed_at = common_utils.get_current_datetime() - - return _ProvisioningPhaseResult( - jobs_to_provision=jobs_to_provision, - instance_models=instances, - compute_group_model=compute_group_model, - ) + return instance_models async def _finalize_submitted_job_processing( From b5d32b909368b1177d883876b2a1906086470098 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Fri, 13 Mar 2026 15:21:35 +0500 Subject: [PATCH 11/16] Extract submitted-jobs master fleet locking --- .../scheduled_tasks/submitted_jobs.py | 108 ++++++++++-------- 1 file changed, 61 insertions(+), 47 deletions(-) diff --git a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py index 3930f0b52..23d835d3a 100644 --- a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py +++ b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py @@ -921,56 +921,70 @@ async def _lock_fleet_and_get_master_provisioning_data( # cluster master from loaded fleet instances here. Resolve the current master via # FleetModel.current_master_instance_id so jobs follow the same master election # as FleetPipeline/InstancePipeline. - master_instance_provisioning_data = None - if is_master_job(job) and fleet_model is not None: - fleet_spec = get_fleet_spec(fleet_model) - if fleet_spec.configuration.placement == InstanceGroupPlacement.CLUSTER: - # To avoid violating fleet placement cluster during master provisioning, - # we must lock empty fleets and respect existing instances in non-empty fleets. - # On SQLite always take the lock during master provisioning for simplicity. - # It's fine to lock fleets currently locked by pipelines (with lock_* fields set) - # since we won't update fleets – we only need to ensure there is no parallel provisioning. - await exit_stack.enter_async_context( - get_locker(get_db().dialect_name).lock_ctx( - FleetModel.__tablename__, [fleet_model.id] - ) - ) - await sqlite_commit(session) - res = await session.execute( - select(FleetModel) - .where( - FleetModel.id == fleet_model.id, - ~exists().where( - InstanceModel.fleet_id == fleet_model.id, - InstanceModel.deleted == False, - ), - ) - .with_for_update(key_share=True, of=FleetModel) - .execution_options(populate_existing=True) - .options(noload(FleetModel.instances)) - ) - empty_fleet_model = res.unique().scalar() - if empty_fleet_model is not None: - fleet_model = empty_fleet_model - else: - res = await session.execute( - select(FleetModel) - .join(FleetModel.instances) - .where( - FleetModel.id == fleet_model.id, - InstanceModel.deleted == False, - ) - .options(contains_eager(FleetModel.instances)) - .execution_options(populate_existing=True) - ) - fleet_model = res.unique().scalar_one() - master_instance_provisioning_data = get_fleet_master_instance_provisioning_data( - fleet_model=fleet_model, - fleet_spec=fleet_spec, - ) + if not is_master_job(job) or fleet_model is None: + return fleet_model, None + + fleet_spec = _get_cluster_fleet_spec(fleet_model) + if fleet_spec is None: + return fleet_model, None + + # To avoid violating fleet placement cluster during master provisioning, + # we must lock empty fleets and respect existing instances in non-empty fleets. + # On SQLite always take the lock during master provisioning for simplicity. + # It's fine to lock fleets currently locked by pipelines (with lock_* fields set) + # since we won't update fleets – we only need to ensure there is no parallel provisioning. + await exit_stack.enter_async_context( + get_locker(get_db().dialect_name).lock_ctx(FleetModel.__tablename__, [fleet_model.id]) + ) + await sqlite_commit(session) + fleet_model = await _refetch_cluster_master_fleet(session=session, fleet_model=fleet_model) + master_instance_provisioning_data = get_fleet_master_instance_provisioning_data( + fleet_model=fleet_model, + fleet_spec=fleet_spec, + ) return fleet_model, master_instance_provisioning_data +def _get_cluster_fleet_spec(fleet_model: FleetModel) -> Optional[FleetSpec]: + fleet_spec = get_fleet_spec(fleet_model) + if fleet_spec.configuration.placement != InstanceGroupPlacement.CLUSTER: + return None + return fleet_spec + + +async def _refetch_cluster_master_fleet( + session: AsyncSession, fleet_model: FleetModel +) -> FleetModel: + res = await session.execute( + select(FleetModel) + .where( + FleetModel.id == fleet_model.id, + ~exists().where( + InstanceModel.fleet_id == fleet_model.id, + InstanceModel.deleted == False, + ), + ) + .with_for_update(key_share=True, of=FleetModel) + .execution_options(populate_existing=True) + .options(noload(FleetModel.instances)) + ) + empty_fleet_model = res.unique().scalar() + if empty_fleet_model is not None: + return empty_fleet_model + + res = await session.execute( + select(FleetModel) + .join(FleetModel.instances) + .where( + FleetModel.id == fleet_model.id, + InstanceModel.deleted == False, + ) + .options(contains_eager(FleetModel.instances)) + .execution_options(populate_existing=True) + ) + return res.unique().scalar_one() + + async def _assign_existing_instance_to_job( session: AsyncSession, fleet_model: Optional[FleetModel], From a62c7afa56f734def57feec28e5d9c28cabad032 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 16 Mar 2026 11:53:41 +0500 Subject: [PATCH 12/16] Inline submitted-jobs offer loop --- .../scheduled_tasks/submitted_jobs.py | 84 ++++++++++++------- 1 file changed, 52 insertions(+), 32 deletions(-) diff --git a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py index 23d835d3a..92eab1ccc 100644 --- a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py +++ b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py @@ -51,6 +51,7 @@ JobRuntimeData, JobStatus, JobTerminationReason, + Requirements, Run, ) from dstack._internal.core.models.volumes import Volume @@ -1074,24 +1075,18 @@ async def _provision_new_capacity( and run only the master job in case there are no offers supporting cluster groups. Other jobs should be provisioned one-by-one later. """ + job = jobs[0] if volumes is None: volumes = [] - job = jobs[0] - profile = run.run_spec.merged_profile - requirements = job.job_spec.requirements - if fleet_model is not None: - fleet_spec = get_fleet_spec(fleet_model) - try: - check_can_create_new_cloud_instance_in_fleet(fleet_model, fleet_spec) - profile, requirements = get_run_profile_and_requirements_in_fleet( - job=job, - run_spec=run.run_spec, - fleet_spec=fleet_spec, - ) - except ValueError as e: - logger.debug("%s: %s", fmt(job_model), e.args[0]) - return None - # TODO: Respect fleet provisioning properties such as tags + effective_profile_and_requirements = _get_effective_profile_and_requirements( + job_model=job_model, + run=run, + job=job, + fleet_model=fleet_model, + ) + if effective_profile_and_requirements is None: + return None + profile, requirements = effective_profile_and_requirements # The placement group is determined when provisioning the master instance # and used for all other instances in the fleet. @@ -1175,22 +1170,21 @@ async def _provision_new_capacity( offer=offer, effective_profile=profile, ) - else: - jpd = await common_utils.run_async( - compute.run_job, - run, - job, - offer, - project_ssh_public_key, - project_ssh_private_key, - offer_volumes, - placement_group_model_to_placement_group_optional(placement_group_model), - ) - return _ProvisionNewCapacityResult( - provisioning_data=jpd, - offer=offer, - effective_profile=profile, - ) + jpd = await common_utils.run_async( + compute.run_job, + run, + job, + offer, + project_ssh_public_key, + project_ssh_private_key, + offer_volumes, + placement_group_model_to_placement_group_optional(placement_group_model), + ) + return _ProvisionNewCapacityResult( + provisioning_data=jpd, + offer=offer, + effective_profile=profile, + ) except BackendError as e: logger.warning( "%s: %s launch in %s/%s failed: %s", @@ -1219,6 +1213,32 @@ async def _provision_new_capacity( return None +def _get_effective_profile_and_requirements( + job_model: JobModel, + run: Run, + job: Job, + fleet_model: Optional[FleetModel], +) -> Optional[tuple[Profile, Requirements]]: + effective_profile = run.run_spec.merged_profile + requirements = job.job_spec.requirements + if fleet_model is None: + return effective_profile, requirements + + fleet_spec = get_fleet_spec(fleet_model) + try: + check_can_create_new_cloud_instance_in_fleet(fleet_model, fleet_spec) + effective_profile, requirements = get_run_profile_and_requirements_in_fleet( + job=job, + run_spec=run.run_spec, + fleet_spec=fleet_spec, + ) + except ValueError as e: + logger.debug("%s: %s", fmt(job_model), e.args[0]) + return None + # TODO: Respect fleet provisioning properties such as tags + return effective_profile, requirements + + async def _create_fleet_model_for_job( exit_stack: AsyncExitStack, session: AsyncSession, From b29118f7c27db1d399700dc9216a88fb6b56c813 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 16 Mar 2026 12:00:16 +0500 Subject: [PATCH 13/16] Reorder submitted-jobs helpers --- .../scheduled_tasks/submitted_jobs.py | 180 +++++++++--------- 1 file changed, 90 insertions(+), 90 deletions(-) diff --git a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py index 92eab1ccc..d8efb007c 100644 --- a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py +++ b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py @@ -370,6 +370,18 @@ async def _load_submitted_job_context( ) +def _get_job_models_for_jobs( + job_models: list[JobModel], + jobs: list[Job], +) -> list[JobModel]: + """ + Returns job models of latest submissions for a list of jobs. + Preserves jobs order. + """ + id_to_job_model_map = {jm.id: jm for jm in job_models} + return [id_to_job_model_map[j.job_submissions[-1].id] for j in jobs] + + async def _resolve_master_job_dependency( session: AsyncSession, job_model: JobModel, @@ -799,6 +811,84 @@ async def _create_instance_models_for_provisioned_jobs( return instance_models +async def _get_taken_instance_nums(session: AsyncSession, fleet_model: FleetModel) -> set[int]: + res = await session.execute( + select(InstanceModel.instance_num).where( + InstanceModel.fleet_id == fleet_model.id, + InstanceModel.deleted.is_(False), + ) + ) + return set(res.scalars().all()) + + +def _create_instance_model_for_job( + project: ProjectModel, + fleet_model: FleetModel, + compute_group_model: Optional[ComputeGroupModel], + job_model: JobModel, + job_provisioning_data: JobProvisioningData, + offer: InstanceOfferWithAvailability, + instance_num: int, + profile: Profile, +) -> InstanceModel: + if not job_provisioning_data.dockerized: + # terminate vastai/k8s instances immediately + termination_policy = TerminationPolicy.DESTROY_AFTER_IDLE + termination_idle_time = 0 + else: + termination_policy, termination_idle_time = get_termination( + profile, DEFAULT_RUN_TERMINATION_IDLE_TIME + ) + instance = InstanceModel( + id=uuid.uuid4(), + name=f"{fleet_model.name}-{instance_num}", + instance_num=instance_num, + project=project, + fleet=fleet_model, + compute_group=compute_group_model, + created_at=common_utils.get_current_datetime(), + started_at=common_utils.get_current_datetime(), + status=InstanceStatus.PROVISIONING, + unreachable=False, + job_provisioning_data=job_provisioning_data.json(), + offer=offer.json(), + termination_policy=termination_policy, + termination_idle_time=termination_idle_time, + jobs=[job_model], + backend=offer.backend, + price=offer.price, + region=offer.region, + volume_attachments=[], + total_blocks=1, + busy_blocks=1, + ) + return instance + + +def _prepare_job_runtime_data( + offer: InstanceOfferWithAvailability, multinode: bool +) -> JobRuntimeData: + if offer.blocks == offer.total_blocks: + if settings.JOB_NETWORK_MODE == settings.JobNetworkMode.FORCED_BRIDGE: + network_mode = NetworkMode.BRIDGE + elif settings.JOB_NETWORK_MODE == settings.JobNetworkMode.HOST_WHEN_POSSIBLE: + network_mode = NetworkMode.HOST + else: + assert settings.JOB_NETWORK_MODE == settings.JobNetworkMode.HOST_FOR_MULTINODE_ONLY + network_mode = NetworkMode.HOST if multinode else NetworkMode.BRIDGE + return JobRuntimeData( + network_mode=network_mode, + offer=offer, + ) + return JobRuntimeData( + network_mode=NetworkMode.BRIDGE, + offer=offer, + cpu=offer.instance.resources.cpus, + gpu=len(offer.instance.resources.gpus), + memory=Memory(offer.instance.resources.memory_mib / 1024), + ) + + async def _finalize_submitted_job_processing( exit_stack: AsyncExitStack, session: AsyncSession, @@ -1286,84 +1376,6 @@ async def _create_fleet_model_for_job( return fleet_model -async def _get_taken_instance_nums(session: AsyncSession, fleet_model: FleetModel) -> set[int]: - res = await session.execute( - select(InstanceModel.instance_num).where( - InstanceModel.fleet_id == fleet_model.id, - InstanceModel.deleted.is_(False), - ) - ) - return set(res.scalars().all()) - - -def _create_instance_model_for_job( - project: ProjectModel, - fleet_model: FleetModel, - compute_group_model: Optional[ComputeGroupModel], - job_model: JobModel, - job_provisioning_data: JobProvisioningData, - offer: InstanceOfferWithAvailability, - instance_num: int, - profile: Profile, -) -> InstanceModel: - if not job_provisioning_data.dockerized: - # terminate vastai/k8s instances immediately - termination_policy = TerminationPolicy.DESTROY_AFTER_IDLE - termination_idle_time = 0 - else: - termination_policy, termination_idle_time = get_termination( - profile, DEFAULT_RUN_TERMINATION_IDLE_TIME - ) - instance = InstanceModel( - id=uuid.uuid4(), - name=f"{fleet_model.name}-{instance_num}", - instance_num=instance_num, - project=project, - fleet=fleet_model, - compute_group=compute_group_model, - created_at=common_utils.get_current_datetime(), - started_at=common_utils.get_current_datetime(), - status=InstanceStatus.PROVISIONING, - unreachable=False, - job_provisioning_data=job_provisioning_data.json(), - offer=offer.json(), - termination_policy=termination_policy, - termination_idle_time=termination_idle_time, - jobs=[job_model], - backend=offer.backend, - price=offer.price, - region=offer.region, - volume_attachments=[], - total_blocks=1, - busy_blocks=1, - ) - return instance - - -def _prepare_job_runtime_data( - offer: InstanceOfferWithAvailability, multinode: bool -) -> JobRuntimeData: - if offer.blocks == offer.total_blocks: - if settings.JOB_NETWORK_MODE == settings.JobNetworkMode.FORCED_BRIDGE: - network_mode = NetworkMode.BRIDGE - elif settings.JOB_NETWORK_MODE == settings.JobNetworkMode.HOST_WHEN_POSSIBLE: - network_mode = NetworkMode.HOST - else: - assert settings.JOB_NETWORK_MODE == settings.JobNetworkMode.HOST_FOR_MULTINODE_ONLY - network_mode = NetworkMode.HOST if multinode else NetworkMode.BRIDGE - return JobRuntimeData( - network_mode=network_mode, - offer=offer, - ) - return JobRuntimeData( - network_mode=NetworkMode.BRIDGE, - offer=offer, - cpu=offer.instance.resources.cpus, - gpu=len(offer.instance.resources.gpus), - memory=Memory(offer.instance.resources.memory_mib / 1024), - ) - - def _get_offer_volumes( volumes: List[List[Volume]], offer: InstanceOfferWithAvailability, @@ -1480,15 +1492,3 @@ async def _attach_volume( instance.volume_attachments.append(volume_attachment_model) volume_model.last_job_processed_at = common_utils.get_current_datetime() - - -def _get_job_models_for_jobs( - job_models: list[JobModel], - jobs: list[Job], -) -> list[JobModel]: - """ - Returns job models of latest submissions for a list of jobs. - Preserves jobs order. - """ - id_to_job_model_map = {jm.id: jm for jm in job_models} - return [id_to_job_model_map[j.job_submissions[-1].id] for j in jobs] From 8a10fc7a4f4ecaced2788b3b312a26250942abc2 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 16 Mar 2026 12:11:36 +0500 Subject: [PATCH 14/16] Simplify submitted-jobs termination helper --- .../background/scheduled_tasks/submitted_jobs.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py index d8efb007c..b155a9dcd 100644 --- a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py +++ b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py @@ -4,7 +4,7 @@ from contextlib import AsyncExitStack from dataclasses import dataclass from datetime import datetime, timedelta -from typing import List, Optional, Union, cast +from typing import List, Optional, Union from sqlalchemy import exists, func, select from sqlalchemy.ext.asyncio import AsyncSession @@ -53,6 +53,7 @@ JobTerminationReason, Requirements, Run, + RunSpec, ) from dstack._internal.core.models.volumes import Volume from dstack._internal.core.services.profiles import get_termination @@ -427,7 +428,7 @@ async def _prepare_job_volumes( session: AsyncSession, job_model: JobModel, project: ProjectModel, - run_spec, + run_spec: RunSpec, job: Job, ) -> Optional[_PreparedJobVolumes]: try: @@ -964,11 +965,11 @@ async def _terminate_submitted_job( session: AsyncSession, job_model: JobModel, reason: JobTerminationReason, - message: object = common_utils.UNSET, + message: Optional[str] = None, ): job_model.termination_reason = reason - if message is not common_utils.UNSET: - job_model.termination_reason_message = cast(Optional[str], message) + if message is not None: + job_model.termination_reason_message = message switch_job_status(session, job_model, JobStatus.TERMINATING) await _mark_job_processed(session=session, job_model=job_model) From 1505c080cd5f2e0329328e638835e62abb07a00f Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 16 Mar 2026 12:20:57 +0500 Subject: [PATCH 15/16] Move submitted-jobs provisioning inputs to context --- .../scheduled_tasks/submitted_jobs.py | 22 +++++-------------- 1 file changed, 5 insertions(+), 17 deletions(-) diff --git a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py index b155a9dcd..31b168304 100644 --- a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py +++ b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py @@ -225,6 +225,7 @@ class _SubmittedJobContext: project: ProjectModel run: Run job: Job + jobs_to_provision: list[Job] replica_jobs: list[Job] replica_job_models: list[JobModel] fleet_model: Optional[FleetModel] @@ -244,7 +245,6 @@ class _PreparedJobVolumes: @dataclass class _ProvisioningPhaseResult: - jobs_to_provision: list[Job] instance_models: list[InstanceModel] compute_group_model: Optional[ComputeGroupModel] @@ -360,6 +360,7 @@ async def _load_submitted_job_context( project=run_model.project, run=run, job=job, + jobs_to_provision=_select_jobs_to_provision(job, replica_jobs, job_model), replica_jobs=replica_jobs, replica_job_models=_get_job_models_for_jobs(run_model.jobs, replica_jobs), # Master job chooses fleet for the run. @@ -572,14 +573,10 @@ async def _process_provisioning_phase( master_job_provisioning_data: Optional[JobProvisioningData], volumes: list[list[Volume]], ) -> Optional[_ProvisioningPhaseResult]: - jobs_to_provision = _select_jobs_to_provision( - context.job, context.replica_jobs, context.job_model - ) if context.job_model.instance is not None: return await _process_existing_instance_provisioning_path( session=session, job_model=context.job_model, - jobs_to_provision=jobs_to_provision, ) if context.run.run_spec.merged_profile.creation_policy == CreationPolicy.REUSE: @@ -596,7 +593,6 @@ async def _process_provisioning_phase( exit_stack=exit_stack, session=session, context=context, - jobs_to_provision=jobs_to_provision, master_job_provisioning_data=master_job_provisioning_data, volumes=volumes, ) @@ -605,7 +601,6 @@ async def _process_provisioning_phase( async def _process_existing_instance_provisioning_path( session: AsyncSession, job_model: JobModel, - jobs_to_provision: list[Job], ) -> _ProvisioningPhaseResult: assert job_model.instance is not None res = await session.execute( @@ -617,7 +612,6 @@ async def _process_existing_instance_provisioning_path( instance = res.unique().scalar_one() switch_job_status(session, job_model, JobStatus.PROVISIONING) return _ProvisioningPhaseResult( - jobs_to_provision=jobs_to_provision, instance_models=[instance], compute_group_model=None, ) @@ -627,7 +621,6 @@ async def _process_new_capacity_provisioning_path( exit_stack: AsyncExitStack, session: AsyncSession, context: _SubmittedJobContext, - jobs_to_provision: list[Job], master_job_provisioning_data: Optional[JobProvisioningData], volumes: list[list[Volume]], ) -> Optional[_ProvisioningPhaseResult]: @@ -650,7 +643,7 @@ async def _process_new_capacity_provisioning_path( fleet_model=fleet_model, job_model=context.job_model, run=context.run, - jobs=jobs_to_provision, + jobs=context.jobs_to_provision, project_ssh_public_key=context.project.ssh_public_key, project_ssh_private_key=context.project.ssh_private_key, master_job_provisioning_data=master_provisioning_data, @@ -687,7 +680,6 @@ async def _process_new_capacity_provisioning_path( return await _materialize_newly_provisioned_capacity( session=session, context=context, - jobs_to_provision=jobs_to_provision, fleet_model=fleet_model, provision_new_capacity_result=provision_new_capacity_result, ) @@ -696,7 +688,6 @@ async def _process_new_capacity_provisioning_path( async def _materialize_newly_provisioned_capacity( session: AsyncSession, context: _SubmittedJobContext, - jobs_to_provision: list[Job], fleet_model: FleetModel, provision_new_capacity_result: _ProvisionNewCapacityResult, ) -> _ProvisioningPhaseResult: @@ -707,7 +698,6 @@ async def _materialize_newly_provisioned_capacity( ) = _resolve_provisioned_jobs_and_data( session=session, context=context, - jobs_to_provision=jobs_to_provision, fleet_model=fleet_model, provisioning_data=provision_new_capacity_result.provisioning_data, ) @@ -729,7 +719,6 @@ async def _materialize_newly_provisioned_capacity( len(provisioned_jobs), ) return _ProvisioningPhaseResult( - jobs_to_provision=jobs_to_provision, instance_models=instance_models, compute_group_model=compute_group_model, ) @@ -738,7 +727,6 @@ async def _materialize_newly_provisioned_capacity( def _resolve_provisioned_jobs_and_data( session: AsyncSession, context: _SubmittedJobContext, - jobs_to_provision: list[Job], fleet_model: FleetModel, provisioning_data: Union[JobProvisioningData, ComputeGroupProvisioningData], ) -> tuple[list[Job], list[JobProvisioningData], Optional[ComputeGroupModel]]: @@ -752,7 +740,7 @@ def _resolve_provisioned_jobs_and_data( ) session.add(compute_group_model) return ( - jobs_to_provision, + context.jobs_to_provision, provisioning_data.job_provisioning_datas, compute_group_model, ) @@ -900,7 +888,7 @@ async def _finalize_submitted_job_processing( _release_replica_jobs_from_master_wait( context.job_model, replica_job_models=context.replica_job_models, - jobs_to_provision=provisioning_phase_result.jobs_to_provision, + jobs_to_provision=context.jobs_to_provision, ) await _attach_job_volumes_if_needed( From 8e1516472dbf188563339f9c3296a97659e945f6 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 16 Mar 2026 12:25:50 +0500 Subject: [PATCH 16/16] Clean up submitted-jobs minor issues --- .../scheduled_tasks/submitted_jobs.py | 24 +++++++++---------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py index 31b168304..52217eefb 100644 --- a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py +++ b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py @@ -4,7 +4,7 @@ from contextlib import AsyncExitStack from dataclasses import dataclass from datetime import datetime, timedelta -from typing import List, Optional, Union +from typing import Optional, Union from sqlalchemy import exists, func, select from sqlalchemy.ext.asyncio import AsyncSession @@ -696,11 +696,12 @@ async def _materialize_newly_provisioned_capacity( job_provisioning_datas, compute_group_model, ) = _resolve_provisioned_jobs_and_data( - session=session, context=context, fleet_model=fleet_model, provisioning_data=provision_new_capacity_result.provisioning_data, ) + if compute_group_model is not None: + session.add(compute_group_model) instance_models = await _create_instance_models_for_provisioned_jobs( session=session, @@ -725,7 +726,6 @@ async def _materialize_newly_provisioned_capacity( def _resolve_provisioned_jobs_and_data( - session: AsyncSession, context: _SubmittedJobContext, fleet_model: FleetModel, provisioning_data: Union[JobProvisioningData, ComputeGroupProvisioningData], @@ -738,7 +738,6 @@ def _resolve_provisioned_jobs_and_data( status=ComputeGroupStatus.RUNNING, provisioning_data=provisioning_data.json(), ) - session.add(compute_group_model) return ( context.jobs_to_provision, provisioning_data.job_provisioning_datas, @@ -914,6 +913,8 @@ async def _attach_job_volumes_if_needed( return volume_models = prepared_job_volumes.volume_models + if len(volume_models) == 0: + return volumes_ids = sorted([v.id for vs in volume_models for v in vs]) # Take lock to prevent attaching volumes that are to be deleted. # If the volume was deleted before the lock, the volume will fail to attach and the job will fail. @@ -928,8 +929,6 @@ async def _attach_job_volumes_if_needed( await exit_stack.enter_async_context( get_locker(get_db().dialect_name).lock_ctx(VolumeModel.__tablename__, volumes_ids) ) - if len(volume_models) == 0: - return assert len(provisioning_phase_result.instance_models) == 1 await _attach_volumes( session=session, @@ -1071,11 +1070,11 @@ async def _assign_existing_instance_to_job( job_model: JobModel, instances_with_offers: list[tuple[InstanceModel, InstanceOfferWithAvailability]], multinode: bool, -) -> Optional[InstanceModel]: +) -> None: job_model.fleet = fleet_model job_model.instance_assigned = True if len(instances_with_offers) == 0: - return None + return instances_with_offers.sort(key=lambda instance_with_offer: instance_with_offer[0].price or 0) instance, offer = instances_with_offers[0] @@ -1105,7 +1104,6 @@ async def _assign_existing_instance_to_job( events.Target.from_model(instance), ], ) - return instance def _select_jobs_to_provision(job: Job, replica_jobs: list[Job], job_model: JobModel) -> list[Job]: @@ -1366,9 +1364,9 @@ async def _create_fleet_model_for_job( def _get_offer_volumes( - volumes: List[List[Volume]], + volumes: list[list[Volume]], offer: InstanceOfferWithAvailability, -) -> List[Volume]: +) -> list[Volume]: """ Returns volumes suitable for the offer for each mount point. """ @@ -1379,7 +1377,7 @@ def _get_offer_volumes( def _get_offer_mount_point_volume( - volumes: List[Volume], + volumes: list[Volume], offer: InstanceOfferWithAvailability, ) -> Volume: """ @@ -1400,7 +1398,7 @@ async def _attach_volumes( project: ProjectModel, job_model: JobModel, instance: InstanceModel, - volume_models: List[List[VolumeModel]], + volume_models: list[list[VolumeModel]], ): job_provisioning_data = common_utils.get_or_error(get_instance_provisioning_data(instance)) backend = await get_project_backend_by_type_or_error(