diff --git a/lib/parallel.rb b/lib/parallel.rb index f8494d9..6f8ff59 100644 --- a/lib/parallel.rb +++ b/lib/parallel.rb @@ -168,25 +168,20 @@ class UserInterruptHandler INTERRUPT_SIGNAL = :SIGINT class << self - # kill all these pids or threads if user presses Ctrl+c - def kill_on_ctrl_c(pids, options) + # kill all these workers if user presses Ctrl+c + def kill_on_ctrl_c(workers, options) @to_be_killed ||= [] - old_interrupt = nil + old_handler = nil signal = options.fetch(:interrupt_signal, INTERRUPT_SIGNAL) - if @to_be_killed.empty? - old_interrupt = trap_interrupt(signal) do - warn 'Parallel execution interrupted, exiting ...' - @to_be_killed.flatten.each { |pid| kill(pid) } - end - end + old_handler = wrap_interrupt(signal, options[:mutex]) if @to_be_killed.empty? - @to_be_killed << pids + @to_be_killed << workers yield ensure @to_be_killed.pop # do not kill pids that could be used for new processes - restore_interrupt(old_interrupt, signal) if @to_be_killed.empty? + Signal.trap(signal, old_handler) if old_handler # restore the old handler on our way out end def kill(thing) @@ -198,23 +193,31 @@ def kill(thing) private - def trap_interrupt(signal) - old = Signal.trap signal, 'IGNORE' - - Signal.trap signal do - yield - if !old || old == "DEFAULT" - raise Interrupt - else - old.call + # Wrap the existing interrupt handler to kill the workers first. Workers may have been replaced, + # so get the latest pids. + # 1. The worker arrays in @to_be_killed are protected by the mutex. + # 2. Mutexes cannot be obtained in a trap context. + # 3. To preserve semantics, the workers must be killed before the old handler runs. + # 4. To preserve semantics, the old handler must run in a trap context on the main thread. + def wrap_interrupt(signal, mutex) + kill_thread = nil + old_handler = Signal.trap(signal) do + next if kill_thread + warn 'Parallel execution interrupted, exiting ...' + kill_thread = Thread.new do + pids = mutex.synchronize do + @to_be_killed.flatten(1).map(&:pid) + # FUTURE: stop JobFactory from spawning new workers + end + pids.each { |pid| kill(pid) } + if old_handler == "DEFAULT" + Signal.trap(signal) { raise Interrupt } + else + Signal.trap(signal, old_handler) + end + Process.kill(signal, Process.pid) # run the old interrupt handler end - end - - old - end - - def restore_interrupt(old, signal) - Signal.trap signal, old + end || "DEFAULT" end end end @@ -557,7 +560,7 @@ def work_in_processes(job_factory, options, &blk) results_mutex = Mutex.new # arrays are not thread-safe exception = nil - UserInterruptHandler.kill_on_ctrl_c(workers.map(&:pid), options) do + UserInterruptHandler.kill_on_ctrl_c(workers, options) do in_threads(options) do |i| worker = workers[i] worker.thread = Thread.current diff --git a/spec/cases/isolated_interrupt.rb b/spec/cases/isolated_interrupt.rb new file mode 100644 index 0000000..5f571f1 --- /dev/null +++ b/spec/cases/isolated_interrupt.rb @@ -0,0 +1,18 @@ +# frozen_string_literal: true +require './spec/cases/helper' + +# Queue two items to be processed in a single worker with isolation enabled. +# Shortly after work begins on the second item, send the process the interrupt signal. +# The process should terminate nearly immediately, indicating the replacement process +# was killed rather than completing work on the second item. + +parent_pid = Process.pid +killer_pid = fork do + sleep 1 + Process.kill(:INT, parent_pid) +end +Process.detach(killer_pid) + +Parallel.each([0.1, 5], in_processes: 1, isolation: true) do |sec| + sleep sec +end diff --git a/spec/parallel_spec.rb b/spec/parallel_spec.rb index fcb330a..3c43941 100644 --- a/spec/parallel_spec.rb +++ b/spec/parallel_spec.rb @@ -144,7 +144,14 @@ def cpus end.should <= 4 end - it "preserves original intrrupts" do + it "kills replaced workers when handling the interrupt signal" do + time_taken do + result = ruby("spec/cases/isolated_interrupt.rb 2>&1 && echo FIN") + result.should_not include("FIN") + end.should <= 2 + end + + it "preserves original interrupts" do t = Thread.new { ruby("spec/cases/double_interrupt.rb 2>&1 && echo FIN") } sleep 2 kill_process_with_name("spec/cases/double_interrupt.rb") # simulates Ctrl+c @@ -154,7 +161,7 @@ def cpus result.should include("FIN") end - it "restores original intrrupts" do + it "restores original interrupts" do ruby("spec/cases/after_interrupt.rb 2>&1").should == "DEFAULT\n" end