Skip to content

Commit c5892ad

Browse files
committed
Unify cancellation handling between Promise and Task.
1 parent 30a5074 commit c5892ad

4 files changed

Lines changed: 87 additions & 75 deletions

File tree

lib/async/promise.rb

Lines changed: 64 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
require_relative "error"
88
require_relative "deadline"
9+
require_relative "cancel"
910

1011
module Async
1112
# A promise represents a value that will be available in the future.
@@ -77,42 +78,6 @@ def value
7778
@mutex.synchronize{@resolved ? @value : nil}
7879
end
7980

80-
# Wait for the promise to be resolved and return the value.
81-
# If already resolved, returns immediately. If rejected, raises the stored exception.
82-
#
83-
# @parameter timeout [Numeric | Nil] Maximum time to wait. If nil, waits indefinitely. If 0, raises immediately if not resolved.
84-
# @returns [Object] The resolved value.
85-
# @raises [Exception] The rejected or cancelled exception.
86-
# @raises [Async::TimeoutError] If timeout expires before the promise is resolved.
87-
def wait(timeout: nil)
88-
@mutex.synchronize do
89-
# Increment waiting count:
90-
@waiting += 1
91-
92-
begin
93-
# Wait for resolution if not already resolved:
94-
unless @resolved
95-
if timeout.nil?
96-
wait_indefinitely
97-
else
98-
wait_with_timeout(timeout)
99-
end
100-
end
101-
102-
# Return value or raise exception based on resolution type:
103-
if @resolved == :completed
104-
return @value
105-
else
106-
# Both :failed and :cancelled store exceptions in @value
107-
raise @value
108-
end
109-
ensure
110-
# Decrement waiting count when done:
111-
@waiting -= 1
112-
end
113-
end
114-
end
115-
11681
# Wait indefinitely for the promise to be resolved.
11782
private def wait_indefinitely
11883
until @resolved
@@ -122,14 +87,14 @@ def wait(timeout: nil)
12287

12388
# Wait for the promise to be resolved, respecting the deadline timeout.
12489
# @parameter timeout [Numeric] The timeout duration.
125-
# @raises [Async::TimeoutError] If the timeout expires before resolution.
90+
# @returns [Boolean] True if resolved, false if timeout expires.
12691
private def wait_with_timeout(timeout)
12792
# Create deadline for timeout tracking:
12893
deadline = Deadline.start(timeout)
12994

13095
# Handle immediate timeout (non-blocking):
13196
if deadline == Deadline::Zero && !@resolved
132-
raise Async::TimeoutError, "Promise wait not resolved!"
97+
return false
13398
end
13499

135100
# Wait with deadline tracking:
@@ -139,11 +104,67 @@ def wait(timeout: nil)
139104

140105
# Check if deadline has expired before waiting:
141106
if remaining <= 0
142-
raise Async::TimeoutError, "Promise wait timed out!"
107+
return false
143108
end
144109

145110
@condition.wait(@mutex, remaining)
146111
end
112+
113+
return true
114+
end
115+
116+
# Wait for the promise to be resolved (without raising exceptions).
117+
#
118+
# If already resolved, returns immediately. Otherwise, waits until resolution or timeout.
119+
#
120+
# @parameter timeout [Numeric | Nil] Maximum time to wait. If nil, waits indefinitely. If 0, returns immediately if not resolved.
121+
# @returns [Boolean] True if the promise is resolved, false if timeout expires
122+
def wait?(timeout: nil)
123+
unless @resolved
124+
@mutex.synchronize do
125+
# Increment waiting count:
126+
@waiting += 1
127+
128+
begin
129+
# Wait for resolution if not already resolved:
130+
unless @resolved
131+
if timeout.nil?
132+
wait_indefinitely
133+
else
134+
unless wait_with_timeout(timeout)
135+
# We don't want to race on @resolved after exiting the mutex:
136+
return nil
137+
end
138+
end
139+
end
140+
ensure
141+
# Decrement waiting count when done:
142+
@waiting -= 1
143+
end
144+
end
145+
end
146+
147+
return @resolved
148+
end
149+
150+
# Wait for the promise to be resolved and return the value.
151+
#
152+
# If already resolved, returns immediately. If rejected, raises the stored exception.
153+
#
154+
# @returns [Object] The resolved value.
155+
# @raises [Exception] The rejected or cancelled exception.
156+
# @raises [Async::TimeoutError] If timeout expires before the promise is resolved.
157+
def wait(...)
158+
resolved = wait?(...)
159+
160+
if resolved.nil?
161+
raise TimeoutError, "Timeout while waiting for promise!"
162+
elsif resolved == :completed
163+
return @value
164+
elsif @value
165+
# If we aren't completed, we should have an exception or cancel reason stored:
166+
raise @value
167+
end
147168
end
148169

149170
# Resolve the promise with a value.
@@ -155,8 +176,8 @@ def resolve(value)
155176
@mutex.synchronize do
156177
return if @resolved
157178

158-
@value = value
159179
@resolved = :completed
180+
@value = value
160181

161182
# Wake up all waiting fibers:
162183
@condition.broadcast
@@ -174,8 +195,8 @@ def reject(exception)
174195
@mutex.synchronize do
175196
return if @resolved
176197

177-
@value = exception
178198
@resolved = :failed
199+
@value = exception
179200

180201
# Wake up all waiting fibers:
181202
@condition.broadcast
@@ -184,20 +205,16 @@ def reject(exception)
184205
return nil
185206
end
186207

187-
# Exception used to indicate cancellation.
188-
class Cancel < Exception
189-
end
190-
191208
# Cancel the promise, indicating cancellation.
192209
# All current and future waiters will receive nil.
193210
# Can only be called on pending promises - no-op if already resolved.
194-
def cancel(exception = Cancel.new("Promise was cancelled!"))
211+
def cancel(exception = Cancel.new("Promise cancelled!"))
195212
@mutex.synchronize do
196213
# No-op if already in any final state
197214
return if @resolved
198215

199-
@value = exception
200216
@resolved = :cancelled
217+
@value = exception
201218

202219
# Wake up all waiting fibers:
203220
@condition.broadcast

lib/async/task.rb

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -270,13 +270,8 @@ def async(*arguments, **options, &block)
270270
def wait
271271
raise "Cannot wait on own fiber!" if Fiber.current.equal?(@fiber)
272272

273-
# Wait for the task to complete - Promise handles all the complexity:
274-
begin
275-
@promise.wait
276-
rescue Promise::Cancel
277-
# For backward compatibility, cancelled tasks return nil:
278-
return nil
279-
end
273+
# Wait for the task to complete:
274+
@promise.wait
280275
end
281276

282277
# For compatibility with `Thread#join` and similar interfaces.
@@ -483,8 +478,8 @@ def failed!(exception = false)
483478
def cancelled!
484479
# Console.info(self, status:) {"Task #{self} was cancelled with #{@children&.size.inspect} children!"}
485480

486-
# Cancel the promise:
487-
@promise.cancel
481+
# Cancel the promise, specify nil here so that no exception is raised when waiting on the promise:
482+
@promise.cancel(nil)
488483

489484
cancelled = false
490485

test/async/promise.rb

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@
201201

202202
expect do
203203
promise.wait(timeout: 0)
204-
end.to raise_exception(Async::TimeoutError, message: be =~ /not resolved/)
204+
end.to raise_exception(Async::TimeoutError, message: be =~ /Timeout/i)
205205
end
206206

207207
it "raises TimeoutError after timeout expires" do
@@ -223,7 +223,7 @@
223223
elapsed = Time.now - start_time
224224

225225
expect(error).to be_a(Async::TimeoutError)
226-
expect(error.message).to be =~ /timed out/
226+
expect(error.message).to be =~ /Timeout/i
227227
expect(elapsed).to be >= 0.1
228228
expect(elapsed).to be < 0.5
229229
expect(promise).not.to be(:waiting?)
@@ -333,11 +333,11 @@
333333
expect(promise.cancelled?).to be == true
334334
expect(promise.completed?).to be == false
335335
expect(promise.failed?).to be == false
336-
expect(promise.value).to be_a(Async::Promise::Cancel)
336+
expect(promise.value).to be_a(Async::Cancel)
337337

338338
expect do
339339
promise.wait
340-
end.to raise_exception(Async::Promise::Cancel, message: be =~ /cancelled/)
340+
end.to raise_exception(Async::Cancel, message: be =~ /cancelled/)
341341
end
342342

343343
it "can be cancelled with custom exception" do
@@ -368,7 +368,7 @@
368368
expect(promise.cancelled?).to be == true
369369
expect do
370370
promise.wait
371-
end.to raise_exception(Async::Promise::Cancel)
371+
end.to raise_exception(Async::Cancel)
372372
end
373373

374374
it "ignores reject after cancel" do
@@ -378,7 +378,7 @@
378378
expect(promise.cancelled?).to be == true
379379
expect do
380380
promise.wait
381-
end.to raise_exception(Async::Promise::Cancel)
381+
end.to raise_exception(Async::Cancel)
382382
end
383383

384384
it "handles multiple concurrent waiters with cancellation" do
@@ -524,7 +524,7 @@
524524
end
525525

526526
it "handles Cancel exceptions (absorbed, not re-raised)" do
527-
cancel_exception = Async::Promise::Cancel.new("user cancelled")
527+
cancel_exception = Async::Cancel.new("user cancelled")
528528

529529
result = promise.fulfill do
530530
raise cancel_exception
@@ -538,14 +538,14 @@
538538
# But promise.wait will raise the cancel exception:
539539
expect do
540540
promise.wait
541-
end.to raise_exception(Async::Promise::Cancel, message: be =~ /user cancelled/)
541+
end.to raise_exception(Async::Cancel, message: be =~ /user cancelled/)
542542
end
543543

544544
it "handles custom Cancel exceptions" do
545545
custom_cancel = StandardError.new("custom stop")
546546

547547
result = promise.fulfill do
548-
raise Async::Promise::Cancel.new("wrapper").tap{|c| c.instance_variable_set(:@cause, custom_cancel)}
548+
raise Async::Cancel.new("wrapper").tap{|c| c.instance_variable_set(:@cause, custom_cancel)}
549549
end
550550

551551
expect(result).to be_nil
@@ -646,7 +646,7 @@
646646
promise3 = Async::Promise.new
647647

648648
# Cancel exception should be caught specifically:
649-
promise1.fulfill{raise Async::Promise::Cancel.new}
649+
promise1.fulfill{raise Async::Cancel.new}
650650
expect(promise1.cancelled?).to be == true
651651

652652
# StandardError should be caught by rescue =>:
@@ -736,7 +736,7 @@
736736
expect(promise.value).to be == original_value
737737
expect do
738738
promise.wait
739-
end.to raise_exception(Async::Promise::Cancel)
739+
end.to raise_exception(Async::Cancel)
740740
end
741741

742742
it "reject after cancel is ignored" do
@@ -749,7 +749,7 @@
749749
expect(promise.value).to be == original_value
750750
expect do
751751
promise.wait
752-
end.to raise_exception(Async::Promise::Cancel)
752+
end.to raise_exception(Async::Cancel)
753753
end
754754

755755
it "fulfill after cancel raises already resolved error" do

test/async/task.rb

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -358,9 +358,9 @@
358358
end
359359

360360
it "can stop nested tasks with exception handling" do
361+
parent = child = nil
362+
361363
reactor.run do
362-
child = nil
363-
364364
parent = reactor.async do |task|
365365
child = task.async do |subtask|
366366
# Never comes back:
@@ -370,20 +370,20 @@
370370
begin
371371
child.wait
372372
ensure
373-
child.stop
373+
child.cancel
374374
end
375375
end
376376

377377
# Ensure the parent has a chance to run:
378378
Fiber.scheduler.yield
379379

380-
parent.stop
380+
parent.cancel
381381
parent.wait
382-
expect(parent).to be(:cancelled?)
383-
384382
child.wait
385-
expect(child).to be(:cancelled?)
386383
end
384+
385+
expect(parent).to be(:cancelled?)
386+
expect(child).to be(:cancelled?)
387387
end
388388

389389
it "can stop current task" do

0 commit comments

Comments
 (0)