diff --git a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py index 87b44b50a..52217eefb 100644 --- a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py +++ b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py @@ -2,8 +2,9 @@ import itertools import uuid from contextlib import AsyncExitStack +from dataclasses import dataclass from datetime import datetime, timedelta -from typing import List, Optional, Union +from typing import Optional, Union from sqlalchemy import exists, func, select from sqlalchemy.ext.asyncio import AsyncSession @@ -52,6 +53,7 @@ JobTerminationReason, Requirements, Run, + RunSpec, ) from dstack._internal.core.models.volumes import Volume from dstack._internal.core.services.profiles import get_termination @@ -216,9 +218,114 @@ async def _process_next_submitted_job(): last_processed_at = common_utils.get_current_datetime() +@dataclass +class _SubmittedJobContext: + job_model: JobModel + run_model: RunModel + project: ProjectModel + run: Run + job: Job + jobs_to_provision: list[Job] + replica_jobs: list[Job] + replica_job_models: list[JobModel] + fleet_model: Optional[FleetModel] + multinode: bool + + +@dataclass +class _MasterJobDependency: + provisioning_data: Optional[JobProvisioningData] + + +@dataclass +class _PreparedJobVolumes: + volume_models: list[list[VolumeModel]] + volumes: list[list[Volume]] + + +@dataclass +class _ProvisioningPhaseResult: + instance_models: list[InstanceModel] + compute_group_model: Optional[ComputeGroupModel] + + +@dataclass +class _ProvisionNewCapacityResult: + provisioning_data: Union[JobProvisioningData, ComputeGroupProvisioningData] + offer: InstanceOfferWithAvailability + effective_profile: Profile + + async def _process_submitted_job( exit_stack: AsyncExitStack, session: AsyncSession, job_model: JobModel ): + context = await _load_submitted_job_context(session=session, job_model=job_model) + logger.debug("%s: provisioning has started", fmt(context.job_model)) + + master_job_dependency = await _resolve_master_job_dependency( + session=session, + job_model=context.job_model, + run=context.run, + job=context.job, + ) + if master_job_dependency is None: + return + master_job_provisioning_data = master_job_dependency.provisioning_data + + if not await _resolve_fleet_dependency( + session=session, + job_model=context.job_model, + run_model=context.run_model, + job=context.job, + ): + return + + prepared_job_volumes = await _prepare_job_volumes( + session=session, + job_model=context.job_model, + project=context.project, + run_spec=context.run.run_spec, + job=context.job, + ) + if prepared_job_volumes is None: + return + + # Submitted jobs processing happens in two steps (transactions). + # First, the jobs gets an instance assigned (or no instance). + # Then, the job runs on the assigned instance or a new instance is provisioned. + # This is needed to avoid holding instances lock for a long time. + if not context.job_model.instance_assigned: + await _process_assignment_phase( + exit_stack=exit_stack, + session=session, + context=context, + master_job_provisioning_data=master_job_provisioning_data, + volumes=prepared_job_volumes.volumes, + ) + return + + provisioning_phase_result = await _process_provisioning_phase( + exit_stack=exit_stack, + session=session, + context=context, + master_job_provisioning_data=master_job_provisioning_data, + volumes=prepared_job_volumes.volumes, + ) + if provisioning_phase_result is None: + return + + await _finalize_submitted_job_processing( + exit_stack=exit_stack, + session=session, + context=context, + prepared_job_volumes=prepared_job_volumes, + provisioning_phase_result=provisioning_phase_result, + ) + + +async def _load_submitted_job_context( + session: AsyncSession, job_model: JobModel +) -> _SubmittedJobContext: # Refetch to load related attributes. res = await session.execute( select(JobModel) @@ -243,40 +350,88 @@ async def _process_submitted_job( ) ) run_model = res.unique().scalar_one() - logger.debug("%s: provisioning has started", fmt(job_model)) - project = run_model.project run = run_model_to_run(run_model) - run_spec = run.run_spec - run_profile = run_spec.merged_profile job = find_job(run.jobs, job_model.replica_num, job_model.job_num) replica_jobs = find_jobs(run.jobs, replica_num=job_model.replica_num) - replica_job_models = _get_job_models_for_jobs(run_model.jobs, replica_jobs) - multinode = job.job_spec.jobs_per_replica > 1 + return _SubmittedJobContext( + job_model=job_model, + run_model=run_model, + project=run_model.project, + run=run, + job=job, + jobs_to_provision=_select_jobs_to_provision(job, replica_jobs, job_model), + replica_jobs=replica_jobs, + replica_job_models=_get_job_models_for_jobs(run_model.jobs, replica_jobs), + # Master job chooses fleet for the run. + # Due to two-step processing, it's saved to job_model.fleet. + # Other jobs just inherit fleet from run_model.fleet. + # If master job chooses no fleet, the new fleet will be created. + fleet_model=run_model.fleet or job_model.fleet, + multinode=job.job_spec.jobs_per_replica > 1, + ) + + +def _get_job_models_for_jobs( + job_models: list[JobModel], + jobs: list[Job], +) -> list[JobModel]: + """ + Returns job models of latest submissions for a list of jobs. + Preserves jobs order. + """ + id_to_job_model_map = {jm.id: jm for jm in job_models} + return [id_to_job_model_map[j.job_submissions[-1].id] for j in jobs] - # Master job chooses fleet for the run. - # Due to two-step processing, it's saved to job_model.fleet. - # Other jobs just inherit fleet from run_model.fleet. - # If master job chooses no fleet, the new fleet will be created. - fleet_model = run_model.fleet or job_model.fleet +async def _resolve_master_job_dependency( + session: AsyncSession, + job_model: JobModel, + run: Run, + job: Job, +) -> Optional[_MasterJobDependency]: + if job.job_spec.job_num == 0: + return _MasterJobDependency(provisioning_data=None) master_job = find_job(run.jobs, job_model.replica_num, 0) - master_job_provisioning_data = None - if job.job_spec.job_num != 0: - if master_job.job_submissions[-1].job_provisioning_data is None: - logger.debug("%s: waiting for master job to be provisioned", fmt(job_model)) - job_model.last_processed_at = common_utils.get_current_datetime() - await session.commit() - return - master_job_provisioning_data = JobProvisioningData.__response__.parse_obj( + if master_job.job_submissions[-1].job_provisioning_data is None: + await _defer_submitted_job( + session=session, + job_model=job_model, + log_message="waiting for master job to be provisioned", + ) + return None + return _MasterJobDependency( + provisioning_data=JobProvisioningData.__response__.parse_obj( master_job.job_submissions[-1].job_provisioning_data ) - if job.job_spec.job_num != 0 or job.job_spec.replica_num != 0: - if run_model.fleet_id is None: - logger.debug("%s: waiting for the run to be assigned to the fleet", fmt(job_model)) - job_model.last_processed_at = common_utils.get_current_datetime() - await session.commit() - return + ) + + +async def _resolve_fleet_dependency( + session: AsyncSession, + job_model: JobModel, + run_model: RunModel, + job: Job, +) -> bool: + if job.job_spec.job_num == 0 and job.job_spec.replica_num == 0: + return True + if run_model.fleet_id is not None: + return True + await _defer_submitted_job( + session=session, + job_model=job_model, + log_message="waiting for the run to be assigned to the fleet", + ) + return False + + +async def _prepare_job_volumes( + session: AsyncSession, + job_model: JobModel, + project: ProjectModel, + run_spec: RunSpec, + job: Job, +) -> Optional[_PreparedJobVolumes]: try: volume_models = await get_job_configured_volume_models( session=session, @@ -295,256 +450,519 @@ async def _process_submitted_job( check_can_attach_job_volumes(volumes) except ServerClientError as e: logger.warning("%s: failed to prepare run volumes: %s", fmt(job_model), repr(e)) - job_model.termination_reason = JobTerminationReason.VOLUME_ERROR - job_model.termination_reason_message = e.msg - switch_job_status(session, job_model, JobStatus.TERMINATING) - job_model.last_processed_at = common_utils.get_current_datetime() - await session.commit() - return - - # Submitted jobs processing happens in two steps (transactions). - # First, the jobs gets an instance assigned (or no instance). - # Then, the job runs on the assigned instance or a new instance is provisioned. - # This is needed to avoid holding instances lock for a long time. - if not job_model.instance_assigned: - fleet_filters, instance_filters = await get_run_candidate_fleet_models_filters( + await _terminate_submitted_job( session=session, - project=project, - run_model=run_model, - run_spec=run_spec, + job_model=job_model, + reason=JobTerminationReason.VOLUME_ERROR, + message=e.msg, ) - ( - fleet_models_with_instances, - fleet_models_without_instances, - ) = await select_run_candidate_fleet_models_with_filters( + return None + return _PreparedJobVolumes( + volume_models=volume_models, + volumes=volumes, + ) + + +async def _process_assignment_phase( + exit_stack: AsyncExitStack, + session: AsyncSession, + context: _SubmittedJobContext, + master_job_provisioning_data: Optional[JobProvisioningData], + volumes: list[list[Volume]], +) -> None: + fleet_model, fleet_instances_with_offers = await _find_assignment_fleet_with_offers( + exit_stack=exit_stack, + session=session, + context=context, + master_job_provisioning_data=master_job_provisioning_data, + volumes=volumes, + ) + if fleet_model is None: + if context.run.run_spec.merged_profile.fleets is not None: + # Run cannot create new fleets when fleets are specified + logger.debug("%s: failed to use specified fleets", fmt(context.job_model)) + await _terminate_submitted_job( + session=session, + job_model=context.job_model, + reason=JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY, + message="Failed to use specified fleets", + ) + return + if not FeatureFlags.AUTOCREATED_FLEETS_ENABLED: + logger.debug("%s: no fleet found", fmt(context.job_model)) + # Note: `_get_job_status_message` relies on the "No fleet found" substring to return "no fleets" + await _terminate_submitted_job( + session=session, + job_model=context.job_model, + reason=JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY, + message=( + "No matching fleet found. Possible reasons: " + "https://dstack.ai/docs/guides/troubleshooting/#no-fleets" + ), + ) + return + + await _assign_existing_instance_to_job( + session=session, + fleet_model=fleet_model, + instances_with_offers=fleet_instances_with_offers, + job_model=context.job_model, + multinode=context.multinode, + ) + await _mark_job_processed(session=session, job_model=context.job_model) + + +async def _find_assignment_fleet_with_offers( + exit_stack: AsyncExitStack, + session: AsyncSession, + context: _SubmittedJobContext, + master_job_provisioning_data: Optional[JobProvisioningData], + volumes: list[list[Volume]], +) -> tuple[Optional[FleetModel], list[tuple[InstanceModel, InstanceOfferWithAvailability]]]: + fleet_filters, instance_filters = await get_run_candidate_fleet_models_filters( + session=session, + project=context.project, + run_model=context.run_model, + run_spec=context.run.run_spec, + ) + ( + fleet_models_with_instances, + fleet_models_without_instances, + ) = await select_run_candidate_fleet_models_with_filters( + session=session, + fleet_filters=fleet_filters, + instance_filters=instance_filters, + lock_instances=True, + ) + instances_ids = sorted( + itertools.chain.from_iterable( + [i.id for i in f.instances] for f in fleet_models_with_instances + ) + ) + await sqlite_commit(session) + await exit_stack.enter_async_context( + get_locker(get_db().dialect_name).lock_ctx(InstanceModel.__tablename__, instances_ids) + ) + if is_db_sqlite(): + fleets_with_instances_ids = [f.id for f in fleet_models_with_instances] + fleet_models_with_instances = await _refetch_fleet_models_with_instances( session=session, + fleets_ids=fleets_with_instances_ids, + instances_ids=instances_ids, fleet_filters=fleet_filters, instance_filters=instance_filters, - lock_instances=True, ) - instances_ids = sorted( - itertools.chain.from_iterable( - [i.id for i in f.instances] for f in fleet_models_with_instances - ) - ) - await sqlite_commit(session) - await exit_stack.enter_async_context( - get_locker(get_db().dialect_name).lock_ctx(InstanceModel.__tablename__, instances_ids) - ) - if is_db_sqlite(): - fleets_with_instances_ids = [f.id for f in fleet_models_with_instances] - fleet_models_with_instances = await _refetch_fleet_models_with_instances( - session=session, - fleets_ids=fleets_with_instances_ids, - instances_ids=instances_ids, - fleet_filters=fleet_filters, - instance_filters=instance_filters, - ) - fleet_models = fleet_models_with_instances + fleet_models_without_instances - fleet_model, fleet_instances_with_offers, _ = await find_optimal_fleet_with_offers( - project=project, - fleet_models=fleet_models, - run_model=run_model, - run_spec=run.run_spec, - job=job, - master_job_provisioning_data=master_job_provisioning_data, - volumes=volumes, - exclude_not_available=True, + fleet_models = fleet_models_with_instances + fleet_models_without_instances + fleet_model, fleet_instances_with_offers, _ = await find_optimal_fleet_with_offers( + project=context.project, + fleet_models=fleet_models, + run_model=context.run_model, + run_spec=context.run.run_spec, + job=context.job, + master_job_provisioning_data=master_job_provisioning_data, + volumes=volumes, + exclude_not_available=True, + ) + return fleet_model, fleet_instances_with_offers + + +async def _process_provisioning_phase( + exit_stack: AsyncExitStack, + session: AsyncSession, + context: _SubmittedJobContext, + master_job_provisioning_data: Optional[JobProvisioningData], + volumes: list[list[Volume]], +) -> Optional[_ProvisioningPhaseResult]: + if context.job_model.instance is not None: + return await _process_existing_instance_provisioning_path( + session=session, + job_model=context.job_model, ) - if fleet_model is None: - if run_spec.merged_profile.fleets is not None: - # Run cannot create new fleets when fleets are specified - logger.debug("%s: failed to use specified fleets", fmt(job_model)) - job_model.termination_reason = ( - JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY - ) - job_model.termination_reason_message = "Failed to use specified fleets" - switch_job_status(session, job_model, JobStatus.TERMINATING) - job_model.last_processed_at = common_utils.get_current_datetime() - await session.commit() - return - if not FeatureFlags.AUTOCREATED_FLEETS_ENABLED: - logger.debug("%s: no fleet found", fmt(job_model)) - job_model.termination_reason = ( - JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY - ) - # Note: `_get_job_status_message` relies on the "No fleet found" substring to return "no fleets" - job_model.termination_reason_message = ( - "No matching fleet found. Possible reasons: " - "https://dstack.ai/docs/guides/troubleshooting/#no-fleets" - ) - switch_job_status(session, job_model, JobStatus.TERMINATING) - job_model.last_processed_at = common_utils.get_current_datetime() - await session.commit() - return - instance = await _assign_job_to_fleet_instance( + + if context.run.run_spec.merged_profile.creation_policy == CreationPolicy.REUSE: + logger.debug("%s: reuse instance failed", fmt(context.job_model)) + await _terminate_submitted_job( session=session, - fleet_model=fleet_model, - instances_with_offers=fleet_instances_with_offers, - job_model=job_model, - multinode=multinode, + job_model=context.job_model, + reason=JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY, + message="Could not reuse any instances for this job", ) - job_model.last_processed_at = common_utils.get_current_datetime() - await session.commit() - return + return None - jobs_to_provision = _get_jobs_to_provision(job, replica_jobs, job_model) - # TODO: Volume attachment for compute groups is not yet supported since - # currently supported compute groups (e.g. Runpod) don't need explicit volume attachment. - need_volume_attachment = True - - if job_model.instance is not None: - res = await session.execute( - select(InstanceModel) - .where(InstanceModel.id == job_model.instance.id) - .options(selectinload(InstanceModel.volume_attachments)) - .execution_options(populate_existing=True) + return await _process_new_capacity_provisioning_path( + exit_stack=exit_stack, + session=session, + context=context, + master_job_provisioning_data=master_job_provisioning_data, + volumes=volumes, + ) + + +async def _process_existing_instance_provisioning_path( + session: AsyncSession, + job_model: JobModel, +) -> _ProvisioningPhaseResult: + assert job_model.instance is not None + res = await session.execute( + select(InstanceModel) + .where(InstanceModel.id == job_model.instance.id) + .options(selectinload(InstanceModel.volume_attachments)) + .execution_options(populate_existing=True) + ) + instance = res.unique().scalar_one() + switch_job_status(session, job_model, JobStatus.PROVISIONING) + return _ProvisioningPhaseResult( + instance_models=[instance], + compute_group_model=None, + ) + + +async def _process_new_capacity_provisioning_path( + exit_stack: AsyncExitStack, + session: AsyncSession, + context: _SubmittedJobContext, + master_job_provisioning_data: Optional[JobProvisioningData], + volumes: list[list[Volume]], +) -> Optional[_ProvisioningPhaseResult]: + ( + fleet_model, + master_instance_provisioning_data, + ) = await _lock_fleet_and_get_master_provisioning_data( + exit_stack=exit_stack, + session=session, + fleet_model=context.fleet_model, + job=context.job, + ) + + # master_job_provisioning_data is present if there is a master job. + # master_instance_provisioning_data is present if there is a master instance (non empty cluster fleet). + master_provisioning_data = master_job_provisioning_data or master_instance_provisioning_data + provision_new_capacity_result = await _provision_new_capacity( + session=session, + project=context.project, + fleet_model=fleet_model, + job_model=context.job_model, + run=context.run, + jobs=context.jobs_to_provision, + project_ssh_public_key=context.project.ssh_public_key, + project_ssh_private_key=context.project.ssh_private_key, + master_job_provisioning_data=master_provisioning_data, + volumes=volumes, + ) + if provision_new_capacity_result is None: + logger.debug("%s: provisioning failed", fmt(context.job_model)) + await _terminate_submitted_job( + session=session, + job_model=context.job_model, + reason=JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY, ) - instance = res.unique().scalar_one() - switch_job_status(session, job_model, JobStatus.PROVISIONING) - else: - if run_profile.creation_policy == CreationPolicy.REUSE: - logger.debug("%s: reuse instance failed", fmt(job_model)) - job_model.termination_reason = JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY - job_model.termination_reason_message = "Could not reuse any instances for this job" - switch_job_status(session, job_model, JobStatus.TERMINATING) - job_model.last_processed_at = common_utils.get_current_datetime() - await session.commit() - return + return None - ( - fleet_model, - master_instance_provisioning_data, - ) = await _fetch_fleet_with_master_instance_provisioning_data( + # TODO: Drop once autocreated fleets are dropped. + if fleet_model is None: + fleet_model = await _create_fleet_model_for_job( exit_stack=exit_stack, session=session, - fleet_model=fleet_model, - job=job, + project=context.project, + run=context.run, ) - master_provisioning_data = ( - master_job_provisioning_data or master_instance_provisioning_data + session.add(fleet_model) + events.emit( + session, + f"Fleet created for job. Fleet status: {fleet_model.status.upper()}", + actor=events.SystemActor(), + targets=[ + events.Target.from_model(fleet_model), + events.Target.from_model(context.job_model), + ], ) - run_job_result = await _run_jobs_on_new_instances( - session=session, - project=project, + + return await _materialize_newly_provisioned_capacity( + session=session, + context=context, + fleet_model=fleet_model, + provision_new_capacity_result=provision_new_capacity_result, + ) + + +async def _materialize_newly_provisioned_capacity( + session: AsyncSession, + context: _SubmittedJobContext, + fleet_model: FleetModel, + provision_new_capacity_result: _ProvisionNewCapacityResult, +) -> _ProvisioningPhaseResult: + ( + provisioned_jobs, + job_provisioning_datas, + compute_group_model, + ) = _resolve_provisioned_jobs_and_data( + context=context, + fleet_model=fleet_model, + provisioning_data=provision_new_capacity_result.provisioning_data, + ) + if compute_group_model is not None: + session.add(compute_group_model) + + instance_models = await _create_instance_models_for_provisioned_jobs( + session=session, + context=context, + fleet_model=fleet_model, + compute_group_model=compute_group_model, + provisioned_jobs=provisioned_jobs, + job_provisioning_datas=job_provisioning_datas, + offer=provision_new_capacity_result.offer, + effective_profile=provision_new_capacity_result.effective_profile, + ) + + logger.info( + "%s: provisioned %s new instance(s)", + fmt(context.job_model), + len(provisioned_jobs), + ) + return _ProvisioningPhaseResult( + instance_models=instance_models, + compute_group_model=compute_group_model, + ) + + +def _resolve_provisioned_jobs_and_data( + context: _SubmittedJobContext, + fleet_model: FleetModel, + provisioning_data: Union[JobProvisioningData, ComputeGroupProvisioningData], +) -> tuple[list[Job], list[JobProvisioningData], Optional[ComputeGroupModel]]: + if isinstance(provisioning_data, ComputeGroupProvisioningData): + compute_group_model = ComputeGroupModel( + id=uuid.uuid4(), + project=context.project, + fleet=fleet_model, + status=ComputeGroupStatus.RUNNING, + provisioning_data=provisioning_data.json(), + ) + return ( + context.jobs_to_provision, + provisioning_data.job_provisioning_datas, + compute_group_model, + ) + return [context.job], [provisioning_data], None + + +async def _create_instance_models_for_provisioned_jobs( + session: AsyncSession, + context: _SubmittedJobContext, + fleet_model: FleetModel, + compute_group_model: Optional[ComputeGroupModel], + provisioned_jobs: list[Job], + job_provisioning_datas: list[JobProvisioningData], + offer: InstanceOfferWithAvailability, + effective_profile: Profile, +) -> list[InstanceModel]: + provisioned_job_models = _get_job_models_for_jobs(context.run_model.jobs, provisioned_jobs) + instance_models: list[InstanceModel] = [] + # FIXME: Fleet is not locked which may lead to duplicate instance_num. + # This is currently hard to fix without locking the fleet for entire provisioning duration. + # Processing should be done in multiple steps so that + # InstanceModel is created before provisioning. + taken_instance_nums = await _get_taken_instance_nums(session, fleet_model) + for provisioned_job_model, job_provisioning_data in zip( + provisioned_job_models, job_provisioning_datas + ): + provisioned_job_model.job_provisioning_data = job_provisioning_data.json() + switch_job_status(session, provisioned_job_model, JobStatus.PROVISIONING) + instance_num = get_next_instance_num(taken_instance_nums) + instance_model = _create_instance_model_for_job( + project=context.project, fleet_model=fleet_model, - job_model=job_model, - run=run, - jobs=jobs_to_provision, - project_ssh_public_key=project.ssh_public_key, - project_ssh_private_key=project.ssh_private_key, - master_job_provisioning_data=master_provisioning_data, - volumes=volumes, + compute_group_model=compute_group_model, + job_model=provisioned_job_model, + job_provisioning_data=job_provisioning_data, + offer=offer, + instance_num=instance_num, + profile=effective_profile, ) - if run_job_result is None: - logger.debug("%s: provisioning failed", fmt(job_model)) - job_model.termination_reason = JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY - switch_job_status(session, job_model, JobStatus.TERMINATING) - job_model.last_processed_at = common_utils.get_current_datetime() - await session.commit() - return + instance_models.append(instance_model) + taken_instance_nums.add(instance_num) + provisioned_job_model.job_runtime_data = _prepare_job_runtime_data( + offer, context.multinode + ).json() + session.add(instance_model) + events.emit( + session, + f"Instance created for job. Instance status: {instance_model.status.upper()}", + actor=events.SystemActor(), + targets=[ + events.Target.from_model(instance_model), + events.Target.from_model(provisioned_job_model), + ], + ) + provisioned_job_model.used_instance_id = instance_model.id + provisioned_job_model.last_processed_at = common_utils.get_current_datetime() + return instance_models - if fleet_model is None: - fleet_model = await _create_fleet_model_for_job( - exit_stack=exit_stack, - session=session, - project=project, - run=run, - ) - session.add(fleet_model) - events.emit( - session, - f"Fleet created for job. Fleet status: {fleet_model.status.upper()}", - actor=events.SystemActor(), - targets=[ - events.Target.from_model(fleet_model), - events.Target.from_model(job_model), - ], - ) - provisioning_data, offer, effective_profile, _ = run_job_result - compute_group_model = None - if isinstance(provisioning_data, ComputeGroupProvisioningData): - need_volume_attachment = False - provisioned_jobs = jobs_to_provision - jpds = provisioning_data.job_provisioning_datas - compute_group_model = ComputeGroupModel( - id=uuid.uuid4(), - project=project, - fleet=fleet_model, - status=ComputeGroupStatus.RUNNING, - provisioning_data=provisioning_data.json(), - ) - session.add(compute_group_model) - else: - provisioned_jobs = [job] - jpds = [provisioning_data] - - logger.info("%s: provisioned %s new instance(s)", fmt(job_model), len(provisioned_jobs)) - provisioned_job_models = _get_job_models_for_jobs(run_model.jobs, provisioned_jobs) - instance = None # Instance for attaching volumes in case of single job provisioned - # FIXME: Fleet is not locked which may lead to duplicate instance_num. - # This is currently hard to fix without locking the fleet for entire provisioning duration. - # Processing should be done in multiple steps so that - # InstanceModel is created before provisioning. - taken_instance_nums = await _get_taken_instance_nums(session, fleet_model) - for provisioned_job_model, jpd in zip(provisioned_job_models, jpds): - provisioned_job_model.job_provisioning_data = jpd.json() - switch_job_status(session, provisioned_job_model, JobStatus.PROVISIONING) - instance_num = get_next_instance_num(taken_instance_nums) - instance = _create_instance_model_for_job( - project=project, - fleet_model=fleet_model, - compute_group_model=compute_group_model, - job_model=provisioned_job_model, - job_provisioning_data=jpd, - offer=offer, - instance_num=instance_num, - profile=effective_profile, - ) - taken_instance_nums.add(instance_num) - provisioned_job_model.job_runtime_data = _prepare_job_runtime_data( - offer, multinode - ).json() - session.add(instance) - events.emit( - session, - f"Instance created for job. Instance status: {instance.status.upper()}", - actor=events.SystemActor(), - targets=[ - events.Target.from_model(instance), - events.Target.from_model(provisioned_job_model), - ], - ) - provisioned_job_model.used_instance_id = instance.id - provisioned_job_model.last_processed_at = common_utils.get_current_datetime() +async def _get_taken_instance_nums(session: AsyncSession, fleet_model: FleetModel) -> set[int]: + res = await session.execute( + select(InstanceModel.instance_num).where( + InstanceModel.fleet_id == fleet_model.id, + InstanceModel.deleted.is_(False), + ) + ) + return set(res.scalars().all()) - _allow_other_replica_jobs_to_provision(job_model, replica_job_models, jobs_to_provision) - volumes_ids = sorted([v.id for vs in volume_models for v in vs]) - if need_volume_attachment: - # Take lock to prevent attaching volumes that are to be deleted. - # If the volume was deleted before the lock, the volume will fail to attach and the job will fail. - # TODO: Lock instances for attaching volumes? - await session.execute( - select(VolumeModel) - .where(VolumeModel.id.in_(volumes_ids)) - .options(joinedload(VolumeModel.user).load_only(UserModel.name)) - .order_by(VolumeModel.id) # take locks in order - .with_for_update(key_share=True, of=VolumeModel) +def _create_instance_model_for_job( + project: ProjectModel, + fleet_model: FleetModel, + compute_group_model: Optional[ComputeGroupModel], + job_model: JobModel, + job_provisioning_data: JobProvisioningData, + offer: InstanceOfferWithAvailability, + instance_num: int, + profile: Profile, +) -> InstanceModel: + if not job_provisioning_data.dockerized: + # terminate vastai/k8s instances immediately + termination_policy = TerminationPolicy.DESTROY_AFTER_IDLE + termination_idle_time = 0 + else: + termination_policy, termination_idle_time = get_termination( + profile, DEFAULT_RUN_TERMINATION_IDLE_TIME ) - await exit_stack.enter_async_context( - get_locker(get_db().dialect_name).lock_ctx(VolumeModel.__tablename__, volumes_ids) + instance = InstanceModel( + id=uuid.uuid4(), + name=f"{fleet_model.name}-{instance_num}", + instance_num=instance_num, + project=project, + fleet=fleet_model, + compute_group=compute_group_model, + created_at=common_utils.get_current_datetime(), + started_at=common_utils.get_current_datetime(), + status=InstanceStatus.PROVISIONING, + unreachable=False, + job_provisioning_data=job_provisioning_data.json(), + offer=offer.json(), + termination_policy=termination_policy, + termination_idle_time=termination_idle_time, + jobs=[job_model], + backend=offer.backend, + price=offer.price, + region=offer.region, + volume_attachments=[], + total_blocks=1, + busy_blocks=1, + ) + return instance + + +def _prepare_job_runtime_data( + offer: InstanceOfferWithAvailability, multinode: bool +) -> JobRuntimeData: + if offer.blocks == offer.total_blocks: + if settings.JOB_NETWORK_MODE == settings.JobNetworkMode.FORCED_BRIDGE: + network_mode = NetworkMode.BRIDGE + elif settings.JOB_NETWORK_MODE == settings.JobNetworkMode.HOST_WHEN_POSSIBLE: + network_mode = NetworkMode.HOST + else: + assert settings.JOB_NETWORK_MODE == settings.JobNetworkMode.HOST_FOR_MULTINODE_ONLY + network_mode = NetworkMode.HOST if multinode else NetworkMode.BRIDGE + return JobRuntimeData( + network_mode=network_mode, + offer=offer, ) - if len(volume_models) > 0: - assert instance is not None - await _attach_volumes( - session=session, - project=project, - job_model=job_model, - instance=instance, - volume_models=volume_models, - ) + return JobRuntimeData( + network_mode=NetworkMode.BRIDGE, + offer=offer, + cpu=offer.instance.resources.cpus, + gpu=len(offer.instance.resources.gpus), + memory=Memory(offer.instance.resources.memory_mib / 1024), + ) + + +async def _finalize_submitted_job_processing( + exit_stack: AsyncExitStack, + session: AsyncSession, + context: _SubmittedJobContext, + prepared_job_volumes: _PreparedJobVolumes, + provisioning_phase_result: _ProvisioningPhaseResult, +) -> None: + _release_replica_jobs_from_master_wait( + context.job_model, + replica_job_models=context.replica_job_models, + jobs_to_provision=context.jobs_to_provision, + ) + + await _attach_job_volumes_if_needed( + exit_stack=exit_stack, + session=session, + context=context, + prepared_job_volumes=prepared_job_volumes, + provisioning_phase_result=provisioning_phase_result, + ) + await session.commit() + + +async def _attach_job_volumes_if_needed( + exit_stack: AsyncExitStack, + session: AsyncSession, + context: _SubmittedJobContext, + prepared_job_volumes: _PreparedJobVolumes, + provisioning_phase_result: _ProvisioningPhaseResult, +) -> None: + # TODO: Volume attachment for compute groups is not yet supported since + # currently supported compute groups (e.g. Runpod) don't need explicit volume attachment. + if provisioning_phase_result.compute_group_model is not None: + return + + volume_models = prepared_job_volumes.volume_models + if len(volume_models) == 0: + return + volumes_ids = sorted([v.id for vs in volume_models for v in vs]) + # Take lock to prevent attaching volumes that are to be deleted. + # If the volume was deleted before the lock, the volume will fail to attach and the job will fail. + # TODO: Lock instances for attaching volumes? + await session.execute( + select(VolumeModel) + .where(VolumeModel.id.in_(volumes_ids)) + .options(joinedload(VolumeModel.user).load_only(UserModel.name)) + .order_by(VolumeModel.id) # take locks in order + .with_for_update(key_share=True, of=VolumeModel) + ) + await exit_stack.enter_async_context( + get_locker(get_db().dialect_name).lock_ctx(VolumeModel.__tablename__, volumes_ids) + ) + assert len(provisioning_phase_result.instance_models) == 1 + await _attach_volumes( + session=session, + project=context.project, + job_model=context.job_model, + instance=provisioning_phase_result.instance_models[0], + volume_models=volume_models, + ) + + +async def _defer_submitted_job( + session: AsyncSession, + job_model: JobModel, + log_message: str, +): + logger.debug("%s: %s", fmt(job_model), log_message) + await _mark_job_processed(session=session, job_model=job_model) + + +async def _terminate_submitted_job( + session: AsyncSession, + job_model: JobModel, + reason: JobTerminationReason, + message: Optional[str] = None, +): + job_model.termination_reason = reason + if message is not None: + job_model.termination_reason_message = message + switch_job_status(session, job_model, JobStatus.TERMINATING) + await _mark_job_processed(session=session, job_model=job_model) + + +async def _mark_job_processed(session: AsyncSession, job_model: JobModel): + job_model.last_processed_at = common_utils.get_current_datetime() await session.commit() @@ -572,7 +990,7 @@ async def _refetch_fleet_models_with_instances( return fleet_models -async def _fetch_fleet_with_master_instance_provisioning_data( +async def _lock_fleet_and_get_master_provisioning_data( exit_stack: AsyncExitStack, session: AsyncSession, fleet_model: Optional[FleetModel], @@ -582,67 +1000,81 @@ async def _fetch_fleet_with_master_instance_provisioning_data( # cluster master from loaded fleet instances here. Resolve the current master via # FleetModel.current_master_instance_id so jobs follow the same master election # as FleetPipeline/InstancePipeline. - master_instance_provisioning_data = None - if is_master_job(job) and fleet_model is not None: - fleet_spec = get_fleet_spec(fleet_model) - if fleet_spec.configuration.placement == InstanceGroupPlacement.CLUSTER: - # To avoid violating fleet placement cluster during master provisioning, - # we must lock empty fleets and respect existing instances in non-empty fleets. - # On SQLite always take the lock during master provisioning for simplicity. - # It's fine to lock fleets currently locked by pipelines (with lock_* fields set) - # since we won't update fleets – we only need to ensure there is no parallel provisioning. - await exit_stack.enter_async_context( - get_locker(get_db().dialect_name).lock_ctx( - FleetModel.__tablename__, [fleet_model.id] - ) - ) - await sqlite_commit(session) - res = await session.execute( - select(FleetModel) - .where( - FleetModel.id == fleet_model.id, - ~exists().where( - InstanceModel.fleet_id == fleet_model.id, - InstanceModel.deleted == False, - ), - ) - .with_for_update(key_share=True, of=FleetModel) - .execution_options(populate_existing=True) - .options(noload(FleetModel.instances)) - ) - empty_fleet_model = res.unique().scalar() - if empty_fleet_model is not None: - fleet_model = empty_fleet_model - else: - res = await session.execute( - select(FleetModel) - .join(FleetModel.instances) - .where( - FleetModel.id == fleet_model.id, - InstanceModel.deleted == False, - ) - .options(contains_eager(FleetModel.instances)) - .execution_options(populate_existing=True) - ) - fleet_model = res.unique().scalar_one() - master_instance_provisioning_data = get_fleet_master_instance_provisioning_data( - fleet_model=fleet_model, - fleet_spec=fleet_spec, - ) + if not is_master_job(job) or fleet_model is None: + return fleet_model, None + + fleet_spec = _get_cluster_fleet_spec(fleet_model) + if fleet_spec is None: + return fleet_model, None + + # To avoid violating fleet placement cluster during master provisioning, + # we must lock empty fleets and respect existing instances in non-empty fleets. + # On SQLite always take the lock during master provisioning for simplicity. + # It's fine to lock fleets currently locked by pipelines (with lock_* fields set) + # since we won't update fleets – we only need to ensure there is no parallel provisioning. + await exit_stack.enter_async_context( + get_locker(get_db().dialect_name).lock_ctx(FleetModel.__tablename__, [fleet_model.id]) + ) + await sqlite_commit(session) + fleet_model = await _refetch_cluster_master_fleet(session=session, fleet_model=fleet_model) + master_instance_provisioning_data = get_fleet_master_instance_provisioning_data( + fleet_model=fleet_model, + fleet_spec=fleet_spec, + ) return fleet_model, master_instance_provisioning_data -async def _assign_job_to_fleet_instance( +def _get_cluster_fleet_spec(fleet_model: FleetModel) -> Optional[FleetSpec]: + fleet_spec = get_fleet_spec(fleet_model) + if fleet_spec.configuration.placement != InstanceGroupPlacement.CLUSTER: + return None + return fleet_spec + + +async def _refetch_cluster_master_fleet( + session: AsyncSession, fleet_model: FleetModel +) -> FleetModel: + res = await session.execute( + select(FleetModel) + .where( + FleetModel.id == fleet_model.id, + ~exists().where( + InstanceModel.fleet_id == fleet_model.id, + InstanceModel.deleted == False, + ), + ) + .with_for_update(key_share=True, of=FleetModel) + .execution_options(populate_existing=True) + .options(noload(FleetModel.instances)) + ) + empty_fleet_model = res.unique().scalar() + if empty_fleet_model is not None: + return empty_fleet_model + + res = await session.execute( + select(FleetModel) + .join(FleetModel.instances) + .where( + FleetModel.id == fleet_model.id, + InstanceModel.deleted == False, + ) + .options(contains_eager(FleetModel.instances)) + .execution_options(populate_existing=True) + ) + return res.unique().scalar_one() + + +async def _assign_existing_instance_to_job( session: AsyncSession, fleet_model: Optional[FleetModel], job_model: JobModel, instances_with_offers: list[tuple[InstanceModel, InstanceOfferWithAvailability]], multinode: bool, -) -> Optional[InstanceModel]: +) -> None: job_model.fleet = fleet_model job_model.instance_assigned = True if len(instances_with_offers) == 0: - return None + return instances_with_offers.sort(key=lambda instance_with_offer: instance_with_offer[0].price or 0) instance, offer = instances_with_offers[0] @@ -672,10 +1104,9 @@ async def _assign_job_to_fleet_instance( events.Target.from_model(instance), ], ) - return instance -def _get_jobs_to_provision(job: Job, replica_jobs: list[Job], job_model: JobModel) -> list[Job]: +def _select_jobs_to_provision(job: Job, replica_jobs: list[Job], job_model: JobModel) -> list[Job]: """ Returns the passed job for non-master jobs and all replica jobs for master jobs in multinode setups. """ @@ -692,7 +1123,7 @@ def _get_jobs_to_provision(job: Job, replica_jobs: list[Job], job_model: JobMode return jobs_to_provision -def _allow_other_replica_jobs_to_provision( +def _release_replica_jobs_from_master_wait( job_model: JobModel, replica_job_models: list[JobModel], jobs_to_provision: list[Job], @@ -703,7 +1134,7 @@ def _allow_other_replica_jobs_to_provision( replica_job_model.waiting_master_job = False -async def _run_jobs_on_new_instances( +async def _provision_new_capacity( session: AsyncSession, project: ProjectModel, job_model: JobModel, @@ -714,38 +1145,25 @@ async def _run_jobs_on_new_instances( master_job_provisioning_data: Optional[JobProvisioningData] = None, volumes: Optional[list[list[Volume]]] = None, fleet_model: Optional[FleetModel] = None, -) -> Optional[ - tuple[ - Union[JobProvisioningData, ComputeGroupProvisioningData], - InstanceOfferWithAvailability, - Profile, - Requirements, - ] -]: +) -> Optional[_ProvisionNewCapacityResult]: """ Provisions an instance for a job or a compute group for multiple jobs and runs the jobs. Even when multiple jobs are passes, it may still provision only one instance and run only the master job in case there are no offers supporting cluster groups. Other jobs should be provisioned one-by-one later. """ + job = jobs[0] if volumes is None: volumes = [] - job = jobs[0] - profile = run.run_spec.merged_profile - requirements = job.job_spec.requirements - if fleet_model is not None: - fleet_spec = get_fleet_spec(fleet_model) - try: - check_can_create_new_cloud_instance_in_fleet(fleet_model, fleet_spec) - profile, requirements = get_run_profile_and_requirements_in_fleet( - job=job, - run_spec=run.run_spec, - fleet_spec=fleet_spec, - ) - except ValueError as e: - logger.debug("%s: %s", fmt(job_model), e.args[0]) - return None - # TODO: Respect fleet provisioning properties such as tags + effective_profile_and_requirements = _get_effective_profile_and_requirements( + job_model=job_model, + run=run, + job=job, + fleet_model=fleet_model, + ) + if effective_profile_and_requirements is None: + return None + profile, requirements = effective_profile_and_requirements # The placement group is determined when provisioning the master instance # and used for all other instances in the fleet. @@ -824,19 +1242,26 @@ async def _run_jobs_on_new_instances( project_ssh_private_key, placement_group_model_to_placement_group_optional(placement_group_model), ) - return cgpd, offer, profile, requirements - else: - jpd = await common_utils.run_async( - compute.run_job, - run, - job, - offer, - project_ssh_public_key, - project_ssh_private_key, - offer_volumes, - placement_group_model_to_placement_group_optional(placement_group_model), + return _ProvisionNewCapacityResult( + provisioning_data=cgpd, + offer=offer, + effective_profile=profile, ) - return jpd, offer, profile, requirements + jpd = await common_utils.run_async( + compute.run_job, + run, + job, + offer, + project_ssh_public_key, + project_ssh_private_key, + offer_volumes, + placement_group_model_to_placement_group_optional(placement_group_model), + ) + return _ProvisionNewCapacityResult( + provisioning_data=jpd, + offer=offer, + effective_profile=profile, + ) except BackendError as e: logger.warning( "%s: %s launch in %s/%s failed: %s", @@ -865,6 +1290,32 @@ async def _run_jobs_on_new_instances( return None +def _get_effective_profile_and_requirements( + job_model: JobModel, + run: Run, + job: Job, + fleet_model: Optional[FleetModel], +) -> Optional[tuple[Profile, Requirements]]: + effective_profile = run.run_spec.merged_profile + requirements = job.job_spec.requirements + if fleet_model is None: + return effective_profile, requirements + + fleet_spec = get_fleet_spec(fleet_model) + try: + check_can_create_new_cloud_instance_in_fleet(fleet_model, fleet_spec) + effective_profile, requirements = get_run_profile_and_requirements_in_fleet( + job=job, + run_spec=run.run_spec, + fleet_spec=fleet_spec, + ) + except ValueError as e: + logger.debug("%s: %s", fmt(job_model), e.args[0]) + return None + # TODO: Respect fleet provisioning properties such as tags + return effective_profile, requirements + + async def _create_fleet_model_for_job( exit_stack: AsyncExitStack, session: AsyncSession, @@ -912,88 +1363,10 @@ async def _create_fleet_model_for_job( return fleet_model -async def _get_taken_instance_nums(session: AsyncSession, fleet_model: FleetModel) -> set[int]: - res = await session.execute( - select(InstanceModel.instance_num).where( - InstanceModel.fleet_id == fleet_model.id, - InstanceModel.deleted.is_(False), - ) - ) - return set(res.scalars().all()) - - -def _create_instance_model_for_job( - project: ProjectModel, - fleet_model: FleetModel, - compute_group_model: Optional[ComputeGroupModel], - job_model: JobModel, - job_provisioning_data: JobProvisioningData, - offer: InstanceOfferWithAvailability, - instance_num: int, - profile: Profile, -) -> InstanceModel: - if not job_provisioning_data.dockerized: - # terminate vastai/k8s instances immediately - termination_policy = TerminationPolicy.DESTROY_AFTER_IDLE - termination_idle_time = 0 - else: - termination_policy, termination_idle_time = get_termination( - profile, DEFAULT_RUN_TERMINATION_IDLE_TIME - ) - instance = InstanceModel( - id=uuid.uuid4(), - name=f"{fleet_model.name}-{instance_num}", - instance_num=instance_num, - project=project, - fleet=fleet_model, - compute_group=compute_group_model, - created_at=common_utils.get_current_datetime(), - started_at=common_utils.get_current_datetime(), - status=InstanceStatus.PROVISIONING, - unreachable=False, - job_provisioning_data=job_provisioning_data.json(), - offer=offer.json(), - termination_policy=termination_policy, - termination_idle_time=termination_idle_time, - jobs=[job_model], - backend=offer.backend, - price=offer.price, - region=offer.region, - volume_attachments=[], - total_blocks=1, - busy_blocks=1, - ) - return instance - - -def _prepare_job_runtime_data( - offer: InstanceOfferWithAvailability, multinode: bool -) -> JobRuntimeData: - if offer.blocks == offer.total_blocks: - if settings.JOB_NETWORK_MODE == settings.JobNetworkMode.FORCED_BRIDGE: - network_mode = NetworkMode.BRIDGE - elif settings.JOB_NETWORK_MODE == settings.JobNetworkMode.HOST_WHEN_POSSIBLE: - network_mode = NetworkMode.HOST - else: - assert settings.JOB_NETWORK_MODE == settings.JobNetworkMode.HOST_FOR_MULTINODE_ONLY - network_mode = NetworkMode.HOST if multinode else NetworkMode.BRIDGE - return JobRuntimeData( - network_mode=network_mode, - offer=offer, - ) - return JobRuntimeData( - network_mode=NetworkMode.BRIDGE, - offer=offer, - cpu=offer.instance.resources.cpus, - gpu=len(offer.instance.resources.gpus), - memory=Memory(offer.instance.resources.memory_mib / 1024), - ) - - def _get_offer_volumes( - volumes: List[List[Volume]], + volumes: list[list[Volume]], offer: InstanceOfferWithAvailability, -) -> List[Volume]: +) -> list[Volume]: """ Returns volumes suitable for the offer for each mount point. """ @@ -1004,7 +1377,7 @@ def _get_offer_volumes( def _get_offer_mount_point_volume( - volumes: List[Volume], + volumes: list[Volume], offer: InstanceOfferWithAvailability, ) -> Volume: """ @@ -1025,7 +1398,7 @@ async def _attach_volumes( project: ProjectModel, job_model: JobModel, instance: InstanceModel, - volume_models: List[List[VolumeModel]], + volume_models: list[list[VolumeModel]], ): job_provisioning_data = common_utils.get_or_error(get_instance_provisioning_data(instance)) backend = await get_project_backend_by_type_or_error( @@ -1106,15 +1479,3 @@ async def _attach_volume( instance.volume_attachments.append(volume_attachment_model) volume_model.last_job_processed_at = common_utils.get_current_datetime() - - -def _get_job_models_for_jobs( - job_models: list[JobModel], - jobs: list[Job], -) -> list[JobModel]: - """ - Returns job models of latest submissions for a list of jobs. - Preserves jobs order. - """ - id_to_job_model_map = {jm.id: jm for jm in job_models} - return [id_to_job_model_map[j.job_submissions[-1].id] for j in jobs]