Skip to content

Commit 1d91f85

Browse files
authored
Fix Barrier when parent.async() yields early (#449)
If `parent.async()` returns before running `@tasks.append(node)`, then `barrier.async()` itself will return before `@tasks` is updated, causing `barrier.wait` to not actually wait on the new task.
1 parent 9182cde commit 1d91f85

4 files changed

Lines changed: 51 additions & 5 deletions

File tree

fixtures/async/chainable_async.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ module Async
99
let(:chainable) {subject.new(parent: parent)}
1010

1111
it "should chain async to parent" do
12-
expect(parent).to receive(:async).and_return(nil)
12+
expect(parent).to receive(:async).and_return{|*arguments, **options, &block|
13+
Async(*arguments, **options, &block)
14+
}
1315

1416
chainable.async do
1517
# Nothing.

lib/async/barrier.rb

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ class Barrier
1818
def initialize(parent: nil)
1919
@tasks = List.new
2020
@finished = Queue.new
21+
@condition = Condition.new
2122

2223
@parent = parent
2324
end
@@ -42,18 +43,32 @@ def size
4243

4344
# Execute a child task and add it to the barrier.
4445
# @asynchronous Executes the given block concurrently.
46+
# @returns [Task] The task which was created to execute the block.
4547
def async(*arguments, parent: (@parent or Task.current), **options, &block)
4648
raise "Barrier is stopped!" if @finished.closed?
4749

4850
waiting = nil
4951

50-
parent.async(*arguments, **options) do |task, *arguments|
51-
waiting = TaskNode.new(task)
52-
@tasks.append(waiting)
52+
task = parent.async(*arguments, **options) do |task, *arguments|
53+
# Create a new list node for the task and add it to the list of waiting tasks:
54+
node = TaskNode.new(task)
55+
@tasks.append(node)
56+
57+
# Signal the outer async block that we have added the task to the list of waiting tasks, and that it can now wait for it to finish:
58+
waiting = node
59+
@condition.signal
60+
61+
# Invoke the block, which may raise an error. If it does, we will still signal that the task has finished:
5362
block.call(task, *arguments)
5463
ensure
55-
@finished.signal(waiting) unless @finished.closed?
64+
# Signal that the task has finished, which will unblock the waiting task:
65+
@finished.signal(node) unless @finished.closed?
5666
end
67+
68+
# `parent.async` may yield before the child block executes, so we wait here until the child has appended itself to `@tasks`, ensuring `wait` cannot return early and miss tracking it:
69+
@condition.wait while waiting.nil?
70+
71+
return task
5772
end
5873

5974
# Whether there are any tasks being held by the barrier.

releases.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Releases
22

3+
## Unreleased
4+
5+
- Fix `Barrier#async` when `parent.async` yields before the child block executes. Previously, `Barrier#wait` could return early and miss tracking the task entirely, because the task had not yet appended itself to the barrier's task list.
6+
37
## v2.38.0
48

59
- Rename `Task#stop` to `Task#cancel` for better clarity and consistency with common concurrency terminology. The old `stop` method is still available as an alias for backward compatibility, but it is recommended to use `cancel` going forward.

test/async/barrier.rb

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,31 @@
133133
barrier.stop
134134
end
135135
end
136+
137+
it "waits even if the child task yields immediately" do
138+
class Yielder
139+
def async(*arguments, **options, &block)
140+
Async(*arguments, **options) do |task, *arguments|
141+
task.yield
142+
block.call(task, *arguments)
143+
end
144+
end
145+
end
146+
147+
parent = Yielder.new
148+
149+
3.times do |i|
150+
barrier.async(parent:){i}
151+
end
152+
153+
expect(barrier.size).to be == 3
154+
155+
results = []
156+
barrier.wait do |task|
157+
results << task.wait
158+
end
159+
expect(results.sort).to be == [0, 1, 2]
160+
end
136161
end
137162

138163
with "#stop" do

0 commit comments

Comments
 (0)