Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 81 additions & 61 deletions pkg/scheduler/actions/reclaim/reclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ func getReclaimedResources(ssn *framework.Session, pendingJob *api.JobInfo, runn

for _, n := range pendingJobTopology {
finalPendingJobTopology[n.PendingTask.Name] = n
// Count total reclaimed capacity (idle + evicted) allocated to this pending task
reclaimedGPU += n.GPU
}

Expand All @@ -179,26 +180,6 @@ func isQueueOverused(ssn *framework.Session, victimJob *api.JobInfo) bool {
return queueAllocatedGPUs > queueDeservedGPUs
}

func noBudgetViolationAfterReclaim(ssn *framework.Session, victimJob, pendingJob *api.JobInfo) bool {
queueAllocatedGPUs := ssn.Queues[victimJob.Queue].GetAllocatedGPU()
queueDeservedGPUs := ssn.Queues[victimJob.Queue].GetDeservedGPU()

withoutVictimJob := int64(0)
if len(victimJob.Tasks) == int(victimJob.MinAvailable) {
// when it's not an elastic workload, we check the total request of the job
withoutVictimJob = queueAllocatedGPUs - victimJob.GetTotalRequestGPU()
} else {
// when it's an elastic workload, we check the minimum between the elastic GPUs and the requested GPUs of the victim job
elasticGPUs := victimJob.GetElasticGPUs()
requestedGPUs := pendingJob.GetTotalRequestGPU()
withoutVictimJob = queueAllocatedGPUs - min(elasticGPUs, requestedGPUs)
}
if queueDeservedGPUs <= withoutVictimJob {
return true
}
return false
}

type EvictTask struct {
NodeName string
GPU int64
Expand All @@ -207,68 +188,107 @@ type EvictTask struct {
}

func findNodesForPendingJob(ssn *framework.Session, victimJob, pendingJob *api.JobInfo) map[string]*EvictTask {
// topology maps have "required number of GPUs" -> "node names"
// {1: ["node1", "node2"]} means the nodes
// using the VictimTask struct instead of a single node name string because we need to carry the task information
// and the idle gpu count on that node for the calculation below
victimNodes := map[string]*EvictTask{}
for _, task := range victimJob.Tasks {
node := ssn.Nodes[task.NodeName]
// Build per-node capacity with idle counted once and evictable tasks listed
type nodeCapacity struct {
nodeName string
idleGPU int64
evictable []*api.TaskInfo
evictableGPU int64
}

nodeCaps := map[string]*nodeCapacity{}
for _, t := range victimJob.Tasks {
node := ssn.Nodes[t.NodeName]
if node == nil {
continue
}
// we need to consider future idle in case the node wasn't fully occupied by the victim task
nodeFutureIdleGPU := node.FutureIdle().ScalarResources["nvidia.com/gpu"]
numGPU := int64(task.Resreq.ScalarResources["nvidia.com/gpu"] + nodeFutureIdleGPU)
tasks, ok := victimNodes[task.NodeName]
gpu := int64(t.Resreq.ScalarResources["nvidia.com/gpu"])
cap, ok := nodeCaps[t.NodeName]
if !ok {
victimNodes[task.NodeName] = &EvictTask{
NodeName: task.NodeName,
GPU: numGPU,
TasksToEvict: []*api.TaskInfo{task},
}
} else {
victimNodes[task.NodeName] = &EvictTask{
NodeName: task.NodeName,
GPU: tasks.GPU + numGPU,
TasksToEvict: append(tasks.TasksToEvict, task),
cap = &nodeCapacity{
nodeName: t.NodeName,
idleGPU: int64(node.FutureIdle().ScalarResources["nvidia.com/gpu"]),
evictable: []*api.TaskInfo{},
evictableGPU: 0,
}
nodeCaps[t.NodeName] = cap
}
cap.evictable = append(cap.evictable, t)
cap.evictableGPU += gpu
}

// record task name -> node name so we know how to pipeline the tasks later
pendingJobTopology := map[string]*EvictTask{}
for _, task := range pendingJob.Tasks {
requiredGPU := int64(task.Resreq.ScalarResources["nvidia.com/gpu"])
for _, node := range victimNodes {
if node.GPU < requiredGPU {
required := int64(task.Resreq.ScalarResources["nvidia.com/gpu"])
placed := false
for _, cap := range nodeCaps {
totalAvail := cap.idleGPU + cap.evictableGPU
if totalAvail < required {
continue
}

// simulate consumption on this node
remainingIdle := cap.idleGPU
remainingTasks := make([]*api.TaskInfo, len(cap.evictable))
copy(remainingTasks, cap.evictable)

result := &EvictTask{
NodeName: node.NodeName,
NodeName: cap.nodeName,
PendingTask: task,
GPU: int64(0),
GPU: 0,
}
if len(node.TasksToEvict) == 0 {
node.GPU -= requiredGPU
result.GPU = requiredGPU
pendingJobTopology[task.Name] = result
delete(pendingJob.Tasks, task.UID)
break
}
for idx, t := range node.TasksToEvict {
requiredGPU -= int64(t.Resreq.ScalarResources["nvidia.com/gpu"])
result.TasksToEvict = append(result.TasksToEvict, t)
result.GPU += int64(t.Resreq.ScalarResources["nvidia.com/gpu"])
node.GPU -= int64(t.Resreq.ScalarResources["nvidia.com/gpu"])
node.TasksToEvict = deleteFromSlice(node.TasksToEvict, idx)
if requiredGPU == 0 {
break

// use idle first
if remainingIdle > 0 {
use := remainingIdle
if use > required {
use = required
}
remainingIdle -= use
required -= use
result.GPU += use
}

// then evict tasks until satisfied
evictedIdx := 0
for required > 0 && evictedIdx < len(remainingTasks) {
vt := remainingTasks[evictedIdx]
g := int64(vt.Resreq.ScalarResources["nvidia.com/gpu"])
result.TasksToEvict = append(result.TasksToEvict, vt)
result.GPU += g
required -= g
evictedIdx++
}

if required > 0 {
// not enough even after evictions; try another node
continue
}

// commit consumption to this node capacity
cap.idleGPU = remainingIdle
if evictedIdx >= len(remainingTasks) {
cap.evictable = []*api.TaskInfo{}
} else {
cap.evictable = remainingTasks[evictedIdx:]
}
// recompute evictableGPU after removing evicted tasks
newEvictableGPU := int64(0)
for _, vt := range cap.evictable {
newEvictableGPU += int64(vt.Resreq.ScalarResources["nvidia.com/gpu"])
}
cap.evictableGPU = newEvictableGPU

pendingJobTopology[task.Name] = result
delete(pendingJob.Tasks, task.UID)
placed = true
break
}
if !placed {
// could not place this pending task with this victim job's nodes
continue
}
}
return pendingJobTopology
}
Expand Down
Loading