diff --git a/task/list.go b/task/list.go index 5b09d34c6..5b9e9395e 100644 --- a/task/list.go +++ b/task/list.go @@ -70,19 +70,25 @@ func (list *List) consumer() { task.wgTask.Done() list.wg.Done() + unlocked := false for _, t := range list.tasks { if t.State == IDLE { // check resources blockingTasks := list.usedResources.UsedBy(t.resources) if len(blockingTasks) == 0 { - list.usedResources.MarkInUse(task.resources, task) + list.usedResources.MarkInUse(t.resources, t) + // unlock list since queueing may block + list.Unlock() + unlocked = true list.queue <- t break } } } + if !unlocked { + list.Unlock() + } } - list.Unlock() }() case <-list.queueDone: @@ -187,7 +193,6 @@ func (list *List) GetTaskReturnValueByID(ID int) (*ProcessReturnValue, error) { // become available. func (list *List) RunTaskInBackground(name string, resources []string, process Process) (Task, *ResourceConflictError) { list.Lock() - defer list.Unlock() list.idCounter++ wgTask := &sync.WaitGroup{} @@ -204,7 +209,11 @@ func (list *List) RunTaskInBackground(name string, resources []string, process P tasks := list.usedResources.UsedBy(resources) if len(tasks) == 0 { list.usedResources.MarkInUse(task.resources, task) + // queueing task might block if channel not ready, unlock list before queueing + list.Unlock() list.queue <- task + } else { + list.Unlock() } return *task, nil