Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 31 additions & 28 deletions lib/parallel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -168,25 +168,20 @@ class UserInterruptHandler
INTERRUPT_SIGNAL = :SIGINT

class << self
# kill all these pids or threads if user presses Ctrl+c
def kill_on_ctrl_c(pids, options)
# kill all these workers if user presses Ctrl+c
def kill_on_ctrl_c(workers, options)
@to_be_killed ||= []
old_interrupt = nil
old_handler = nil
signal = options.fetch(:interrupt_signal, INTERRUPT_SIGNAL)

if @to_be_killed.empty?
old_interrupt = trap_interrupt(signal) do
warn 'Parallel execution interrupted, exiting ...'
@to_be_killed.flatten.each { |pid| kill(pid) }
end
end
old_handler = wrap_interrupt(signal, options[:mutex]) if @to_be_killed.empty?

@to_be_killed << pids
@to_be_killed << workers

yield
ensure
@to_be_killed.pop # do not kill pids that could be used for new processes
restore_interrupt(old_interrupt, signal) if @to_be_killed.empty?
Signal.trap(signal, old_handler) if old_handler # restore the old handler on our way out
end

def kill(thing)
Expand All @@ -198,23 +193,31 @@ def kill(thing)

private

def trap_interrupt(signal)
old = Signal.trap signal, 'IGNORE'

Signal.trap signal do
yield
if !old || old == "DEFAULT"
raise Interrupt
else
old.call
# Wrap the existing interrupt handler to kill the workers first. Workers may have been replaced,
# so get the latest pids.
# 1. The worker arrays in @to_be_killed are protected by the mutex.
# 2. Mutexes cannot be obtained in a trap context.
# 3. To preserve semantics, the workers must be killed before the old handler runs.
# 4. To preserve semantics, the old handler must run in a trap context on the main thread.
def wrap_interrupt(signal, mutex)
kill_thread = nil
old_handler = Signal.trap(signal) do
next if kill_thread
warn 'Parallel execution interrupted, exiting ...'
kill_thread = Thread.new do
pids = mutex.synchronize do
@to_be_killed.flatten(1).map(&:pid)
# FUTURE: stop JobFactory from spawning new workers
end
pids.each { |pid| kill(pid) }
if old_handler == "DEFAULT"
Signal.trap(signal) { raise Interrupt }
else
Signal.trap(signal, old_handler)
end
Process.kill(signal, Process.pid) # run the old interrupt handler
end
end

old
end

def restore_interrupt(old, signal)
Signal.trap signal, old
end || "DEFAULT"
end
end
end
Expand Down Expand Up @@ -557,7 +560,7 @@ def work_in_processes(job_factory, options, &blk)
results_mutex = Mutex.new # arrays are not thread-safe
exception = nil

UserInterruptHandler.kill_on_ctrl_c(workers.map(&:pid), options) do
UserInterruptHandler.kill_on_ctrl_c(workers, options) do
in_threads(options) do |i|
worker = workers[i]
worker.thread = Thread.current
Expand Down
18 changes: 18 additions & 0 deletions spec/cases/isolated_interrupt.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# frozen_string_literal: true
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some description here what this is doing/simulating

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

require './spec/cases/helper'

# Queue two items to be processed in a single worker with isolation enabled.
# Shortly after work begins on the second item, send the process the interrupt signal.
# The process should terminate nearly immediately, indicating the replacement process
# was killed rather than completing work on the second item.

parent_pid = Process.pid
killer_pid = fork do
sleep 1
Process.kill(:INT, parent_pid)
end
Process.detach(killer_pid)

Parallel.each([0.1, 5], in_processes: 1, isolation: true) do |sec|
sleep sec
end
11 changes: 9 additions & 2 deletions spec/parallel_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,14 @@ def cpus
end.should <= 4
end

it "preserves original intrrupts" do
it "kills replaced workers when handling the interrupt signal" do
time_taken do
result = ruby("spec/cases/isolated_interrupt.rb 2>&1 && echo FIN")
result.should_not include("FIN")
end.should <= 2
Comment thread
grosser marked this conversation as resolved.
end

it "preserves original interrupts" do
t = Thread.new { ruby("spec/cases/double_interrupt.rb 2>&1 && echo FIN") }
sleep 2
kill_process_with_name("spec/cases/double_interrupt.rb") # simulates Ctrl+c
Expand All @@ -154,7 +161,7 @@ def cpus
result.should include("FIN")
end

it "restores original intrrupts" do
it "restores original interrupts" do
ruby("spec/cases/after_interrupt.rb 2>&1").should == "DEFAULT\n"
end

Expand Down