From babb0da511c6ce95ef918b7186bf5f4f9a73fa00 Mon Sep 17 00:00:00 2001 From: Peter Winton Date: Sun, 12 Apr 2026 21:06:48 -0400 Subject: [PATCH 1/5] Kill replaced worker processes when interrupted --- lib/parallel.rb | 55 ++++++++++++++++---------------- spec/cases/isolated_interrupt.rb | 13 ++++++++ spec/parallel_spec.rb | 10 ++++-- 3 files changed, 48 insertions(+), 30 deletions(-) create mode 100644 spec/cases/isolated_interrupt.rb diff --git a/lib/parallel.rb b/lib/parallel.rb index f8494d9..7d113cb 100644 --- a/lib/parallel.rb +++ b/lib/parallel.rb @@ -169,24 +169,44 @@ class UserInterruptHandler class << self # kill all these pids or threads if user presses Ctrl+c - def kill_on_ctrl_c(pids, options) + def kill_on_ctrl_c(workers, options) @to_be_killed ||= [] old_interrupt = nil signal = options.fetch(:interrupt_signal, INTERRUPT_SIGNAL) if @to_be_killed.empty? - old_interrupt = trap_interrupt(signal) do + # Wrap the existing interrupt handler to kill the workers first. Workers may have been replaced, + # so get the latest pids. + # 1. The worker arrays in @to_be_killed are protected by options[:mutex]. + # 2. Mutexes cannot be obtained in a trap context. + # 3. To preserve semantics, the workers must be killed before the old handler runs. + # 4. To preserve semantics, the old handler must run in a trap context on the main thread. + kill_thread = nil + old_interrupt = Signal.trap(signal) do + next if kill_thread warn 'Parallel execution interrupted, exiting ...' - @to_be_killed.flatten.each { |pid| kill(pid) } - end + kill_thread = Thread.new do + pids = options[:mutex].synchronize do + @to_be_killed.flatten(1).map(&:pid) + # FUTURE: stop JobFactory from spawning new workers + end + pids.each { |pid| kill(pid) } + if old_interrupt == "DEFAULT" + Signal.trap(signal) { raise Interrupt } + else + Signal.trap(signal, old_interrupt) + end + Process.kill(signal, Process.pid) # run the old interrupt handler + end + end || "DEFAULT" end - @to_be_killed << pids + @to_be_killed << workers yield ensure @to_be_killed.pop # do not kill pids that could be used for new processes - restore_interrupt(old_interrupt, signal) if @to_be_killed.empty? + Signal.trap(signal, old_interrupt) if @to_be_killed.empty? # restore the old handler on our way out end def kill(thing) @@ -195,27 +215,6 @@ def kill(thing) # some linux systems already automatically killed the children at this point # so we just ignore them not being there end - - private - - def trap_interrupt(signal) - old = Signal.trap signal, 'IGNORE' - - Signal.trap signal do - yield - if !old || old == "DEFAULT" - raise Interrupt - else - old.call - end - end - - old - end - - def restore_interrupt(old, signal) - Signal.trap signal, old - end end end @@ -557,7 +556,7 @@ def work_in_processes(job_factory, options, &blk) results_mutex = Mutex.new # arrays are not thread-safe exception = nil - UserInterruptHandler.kill_on_ctrl_c(workers.map(&:pid), options) do + UserInterruptHandler.kill_on_ctrl_c(workers, options) do in_threads(options) do |i| worker = workers[i] worker.thread = Thread.current diff --git a/spec/cases/isolated_interrupt.rb b/spec/cases/isolated_interrupt.rb new file mode 100644 index 0000000..c9c6a75 --- /dev/null +++ b/spec/cases/isolated_interrupt.rb @@ -0,0 +1,13 @@ +# frozen_string_literal: true +require './spec/cases/helper' + +parent_pid = Process.pid +killer_pid = fork do + sleep 1 + Process.kill(:INT, parent_pid) +end +Process.detach(killer_pid) + +Parallel.each([0.1, 5], in_processes: 1, isolation: true) do |sec| + sleep sec +end diff --git a/spec/parallel_spec.rb b/spec/parallel_spec.rb index fcb330a..c213734 100644 --- a/spec/parallel_spec.rb +++ b/spec/parallel_spec.rb @@ -144,7 +144,13 @@ def cpus end.should <= 4 end - it "preserves original intrrupts" do + it "kills replaced workers when handling the interrupt signal" do + time_taken do + ruby("spec/cases/isolated_interrupt.rb 2>&1") + end.should <= 2 + end + + it "preserves original interrupts" do t = Thread.new { ruby("spec/cases/double_interrupt.rb 2>&1 && echo FIN") } sleep 2 kill_process_with_name("spec/cases/double_interrupt.rb") # simulates Ctrl+c @@ -154,7 +160,7 @@ def cpus result.should include("FIN") end - it "restores original intrrupts" do + it "restores original interrupts" do ruby("spec/cases/after_interrupt.rb 2>&1").should == "DEFAULT\n" end From 7fd3772a969f9855f971378d8e45f56bd8d49878 Mon Sep 17 00:00:00 2001 From: Peter Winton Date: Mon, 13 Apr 2026 18:43:06 -0400 Subject: [PATCH 2/5] Verify interrupted process exits with nonzero status --- spec/parallel_spec.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spec/parallel_spec.rb b/spec/parallel_spec.rb index c213734..3c43941 100644 --- a/spec/parallel_spec.rb +++ b/spec/parallel_spec.rb @@ -146,7 +146,8 @@ def cpus it "kills replaced workers when handling the interrupt signal" do time_taken do - ruby("spec/cases/isolated_interrupt.rb 2>&1") + result = ruby("spec/cases/isolated_interrupt.rb 2>&1 && echo FIN") + result.should_not include("FIN") end.should <= 2 end From 559643cc2fcb244adcbe82f45b2c60cdd6c76f84 Mon Sep 17 00:00:00 2001 From: Peter Winton Date: Mon, 13 Apr 2026 18:49:46 -0400 Subject: [PATCH 3/5] Add description to isolate_interrupt.rb --- spec/cases/isolated_interrupt.rb | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/spec/cases/isolated_interrupt.rb b/spec/cases/isolated_interrupt.rb index c9c6a75..5f571f1 100644 --- a/spec/cases/isolated_interrupt.rb +++ b/spec/cases/isolated_interrupt.rb @@ -1,6 +1,11 @@ # frozen_string_literal: true require './spec/cases/helper' +# Queue two items to be processed in a single worker with isolation enabled. +# Shortly after work begins on the second item, send the process the interrupt signal. +# The process should terminate nearly immediately, indicating the replacement process +# was killed rather than completing work on the second item. + parent_pid = Process.pid killer_pid = fork do sleep 1 From 4fbaa3770ca5d15df87cdd832a786d3d51d929a8 Mon Sep 17 00:00:00 2001 From: Peter Winton Date: Mon, 13 Apr 2026 18:55:50 -0400 Subject: [PATCH 4/5] Factor out wrap_interrupt --- lib/parallel.rb | 62 ++++++++++++++++++++++++++----------------------- 1 file changed, 33 insertions(+), 29 deletions(-) diff --git a/lib/parallel.rb b/lib/parallel.rb index 7d113cb..5113113 100644 --- a/lib/parallel.rb +++ b/lib/parallel.rb @@ -168,45 +168,20 @@ class UserInterruptHandler INTERRUPT_SIGNAL = :SIGINT class << self - # kill all these pids or threads if user presses Ctrl+c + # kill all these workers if user presses Ctrl+c def kill_on_ctrl_c(workers, options) @to_be_killed ||= [] - old_interrupt = nil + old_handler = nil signal = options.fetch(:interrupt_signal, INTERRUPT_SIGNAL) - if @to_be_killed.empty? - # Wrap the existing interrupt handler to kill the workers first. Workers may have been replaced, - # so get the latest pids. - # 1. The worker arrays in @to_be_killed are protected by options[:mutex]. - # 2. Mutexes cannot be obtained in a trap context. - # 3. To preserve semantics, the workers must be killed before the old handler runs. - # 4. To preserve semantics, the old handler must run in a trap context on the main thread. - kill_thread = nil - old_interrupt = Signal.trap(signal) do - next if kill_thread - warn 'Parallel execution interrupted, exiting ...' - kill_thread = Thread.new do - pids = options[:mutex].synchronize do - @to_be_killed.flatten(1).map(&:pid) - # FUTURE: stop JobFactory from spawning new workers - end - pids.each { |pid| kill(pid) } - if old_interrupt == "DEFAULT" - Signal.trap(signal) { raise Interrupt } - else - Signal.trap(signal, old_interrupt) - end - Process.kill(signal, Process.pid) # run the old interrupt handler - end - end || "DEFAULT" - end + old_handler = wrap_interrupt(signal, options[:mutex]) || "DEFAULT" if @to_be_killed.empty? @to_be_killed << workers yield ensure @to_be_killed.pop # do not kill pids that could be used for new processes - Signal.trap(signal, old_interrupt) if @to_be_killed.empty? # restore the old handler on our way out + Signal.trap(signal, old_handler) if old_handler # restore the old handler on our way out end def kill(thing) @@ -215,6 +190,35 @@ def kill(thing) # some linux systems already automatically killed the children at this point # so we just ignore them not being there end + + private + + # Wrap the existing interrupt handler to kill the workers first. Workers may have been replaced, + # so get the latest pids. + # 1. The worker arrays in @to_be_killed are protected by the mutex. + # 2. Mutexes cannot be obtained in a trap context. + # 3. To preserve semantics, the workers must be killed before the old handler runs. + # 4. To preserve semantics, the old handler must run in a trap context on the main thread. + def wrap_interrupt(signal, mutex) + kill_thread = nil + old_handler = Signal.trap(signal) do + next if kill_thread + warn 'Parallel execution interrupted, exiting ...' + kill_thread = Thread.new do + pids = mutex.synchronize do + @to_be_killed.flatten(1).map(&:pid) + # FUTURE: stop JobFactory from spawning new workers + end + pids.each { |pid| kill(pid) } + if old_handler == "DEFAULT" + Signal.trap(signal) { raise Interrupt } + else + Signal.trap(signal, old_handler) + end + Process.kill(signal, Process.pid) # run the old interrupt handler + end + end + end end end From 34ea3f89a7b97e214400e02a1b7cdaad650fb407 Mon Sep 17 00:00:00 2001 From: Peter Winton Date: Mon, 13 Apr 2026 19:29:31 -0400 Subject: [PATCH 5/5] Move handling of nil old_handler to proper location --- lib/parallel.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/parallel.rb b/lib/parallel.rb index 5113113..6f8ff59 100644 --- a/lib/parallel.rb +++ b/lib/parallel.rb @@ -174,7 +174,7 @@ def kill_on_ctrl_c(workers, options) old_handler = nil signal = options.fetch(:interrupt_signal, INTERRUPT_SIGNAL) - old_handler = wrap_interrupt(signal, options[:mutex]) || "DEFAULT" if @to_be_killed.empty? + old_handler = wrap_interrupt(signal, options[:mutex]) if @to_be_killed.empty? @to_be_killed << workers @@ -217,7 +217,7 @@ def wrap_interrupt(signal, mutex) end Process.kill(signal, Process.pid) # run the old interrupt handler end - end + end || "DEFAULT" end end end