From 63078b6af765fbb5447f8561a96c705e8045dbb1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mos=C3=A8=20Giordano?= Date: Thu, 2 Apr 2026 19:51:42 +0200 Subject: [PATCH 01/15] Add serial test execution support Allow designating specific tests to run sequentially instead of in parallel, useful for memory-hungry tests that cannot safely overlap. New `serial` and `serial_position` keyword arguments on `runtests` control which tests run one-at-a-time and whether the serial phase runs before (default) or after the parallel batch. A new exported `partition_tests` helper splits the ordered test list into serial and parallel groups. Co-authored-by: Claude --- src/ParallelTestRunner.jl | 276 +++++++++++++++++++++++--------------- test/runtests.jl | 136 +++++++++++++++++++ 2 files changed, 305 insertions(+), 107 deletions(-) diff --git a/src/ParallelTestRunner.jl b/src/ParallelTestRunner.jl index a049488..38baf64 100644 --- a/src/ParallelTestRunner.jl +++ b/src/ParallelTestRunner.jl @@ -1,6 +1,6 @@ module ParallelTestRunner -export runtests, addworkers, addworker, find_tests, parse_args, filter_tests! +export runtests, addworkers, addworker, find_tests, parse_args, filter_tests!, partition_tests using Malt using Dates @@ -693,6 +693,24 @@ function filter_tests!(testsuite, args::ParsedArgs) return false end +""" + partition_tests(tests::Vector{String}, serial::Vector{String}) -> (serial_tests, parallel_tests) + +Split `tests` into two ordered vectors: tests named in `serial` (preserving their +order in `tests`) and the remaining parallel tests. Throws `ArgumentError` if any +name in `serial` is not present in `tests`. +""" +function partition_tests(tests::Vector{String}, serial::Vector{String}) + serial_set = Set(serial) + unknown = setdiff(serial_set, Set(tests)) + if !isempty(unknown) + throw(ArgumentError("serial test(s) not found in testsuite: $(join(sort!(collect(unknown)), ", "))")) + end + serial_tests = filter(t -> t in serial_set, tests) + parallel_tests = filter(t -> !(t in serial_set), tests) + return serial_tests, parallel_tests +end + """ runtests(mod::Module, args::Union{ParsedArgs,Array{String}}; testsuite::Dict{String,Expr}=find_tests(pwd()), @@ -701,7 +719,9 @@ end test_worker = Returns(nothing), stdout = Base.stdout, stderr = Base.stderr, - max_worker_rss = get_max_worker_rss()) + max_worker_rss = get_max_worker_rss(), + serial = String[], + serial_position::Symbol = :before) runtests(mod::Module, ARGS; ...) Run Julia tests in parallel across multiple worker processes. @@ -725,6 +745,10 @@ Several keyword arguments are also supported: When returning `nothing`, the test will be assigned to any available default worker. - `stdout` and `stderr`: I/O streams to write to (default: `Base.stdout` and `Base.stderr`) - `max_worker_rss`: RSS threshold where a worker will be restarted once it is reached. +- `serial`: A vector of test names (keys of `testsuite`) that should be run one at a time + instead of in parallel. An `ArgumentError` is thrown if any name is not found in the testsuite. +- `serial_position`: When to run serial tests relative to the parallel batch. + Must be `:before` (default) or `:after`. ## Command Line Options @@ -792,6 +816,14 @@ end runtests(MyPackage, args; testsuite) ``` +Run memory-hungry tests serially before the parallel batch +```julia +using ParallelTestRunner +using MyPackage + +runtests(MyPackage, ARGS; serial=["big_alloc_test", "huge_matrix"]) +``` + ## Memory Management Workers are automatically recycled when they exceed memory limits to prevent out-of-memory @@ -800,7 +832,8 @@ issues during long test runs. The memory limit is set based on system architectu function runtests(mod::Module, args::ParsedArgs; testsuite::Dict{String,Expr} = find_tests(pwd()), init_code = :(), init_worker_code = :(), test_worker = Returns(nothing), - stdout = Base.stdout, stderr = Base.stderr, max_worker_rss = get_max_worker_rss()) + stdout = Base.stdout, stderr = Base.stderr, max_worker_rss = get_max_worker_rss(), + serial::Vector{String} = String[], serial_position::Symbol = :before) # # set-up # @@ -814,25 +847,33 @@ function runtests(mod::Module, args::ParsedArgs; exit(0) end + # validate serial_position + serial_position in (:before, :after) || + throw(ArgumentError("serial_position must be :before or :after, got :$serial_position")) + # filter tests filter_tests!(testsuite, args) + # filter serial list to only include tests that survived filtering + serial = filter(t -> haskey(testsuite, t), serial) + # determine test order tests = collect(keys(testsuite)) Random.shuffle!(tests) historical_durations = load_test_history(mod) sort!(tests, by = x -> -get(historical_durations, x, Inf)) + # partition into serial and parallel groups + serial_tests, parallel_tests = partition_tests(tests, serial) + # determine parallelism jobs = something(args.jobs, default_njobs()) jobs = clamp(jobs, 1, length(tests)) println(stdout, "Running $(length(tests)) tests using $jobs parallel jobs. If this is too many concurrent jobs, specify the `--jobs=N` argument to the tests, or set the `JULIA_CPU_THREADS` environment variable.") - !isnothing(args.verbose) && println(stdout, "Available memory: $(Base.format_bytes(available_memory()))") - sem = Base.Semaphore(max(1, jobs)) - worker_pool = Channel{Union{Nothing, PTRWorker}}(jobs) - for _ in 1:jobs - put!(worker_pool, nothing) + if !isempty(serial_tests) + println(stdout, " $(length(serial_tests)) serial test(s) will run $(serial_position) the parallel batch.") end + !isnothing(args.verbose) && println(stdout, "Available memory: $(Base.format_bytes(available_memory()))") t0 = time() results = [] @@ -1026,110 +1067,139 @@ function runtests(mod::Module, args::ParsedArgs; # execution # - tests_to_start = Threads.Atomic{Int}(length(tests)) - try - @sync for test in tests - push!(worker_tasks, Threads.@spawn begin - local p = nothing - acquired = false - try - Base.acquire(sem) - acquired = true - p = take!(worker_pool) - Threads.atomic_sub!(tests_to_start, 1) - - done && return - - test_t0 = Base.@lock test_lock begin - test_t0 = time() - running_tests[test] = test_t0 - end + function run_phase(phase_tests, phase_jobs) + isempty(phase_tests) && return + done && return - # pass in init_worker_code to custom worker function if defined - wrkr = if init_worker_code == :() - test_worker(test) - else - test_worker(test, init_worker_code) - end - if wrkr === nothing - wrkr = p - end - # if a worker failed, spawn a new one - if wrkr === nothing || !Malt.isrunning(wrkr) - wrkr = p = addworker(; init_worker_code, io_ctx.color) - end + phase_sem = Base.Semaphore(max(1, phase_jobs)) + phase_pool = Channel{Union{Nothing, PTRWorker}}(phase_jobs) + for _ in 1:phase_jobs + put!(phase_pool, nothing) + end + phase_remaining = Threads.Atomic{Int}(length(phase_tests)) - # run the test - put!(printer_channel, (:started, test, worker_id(wrkr))) - result = try - Malt.remote_eval_wait(Main, wrkr.w, :(import ParallelTestRunner)) - Malt.remote_call_fetch(invokelatest, wrkr.w, runtest, - testsuite[test], test, init_code, test_t0) - catch ex - if isa(ex, InterruptException) - # the worker got interrupted, signal other tasks to stop - stop_work() - return + try + @sync for test in phase_tests + push!(worker_tasks, Threads.@spawn begin + local p = nothing + acquired = false + try + Base.acquire(phase_sem) + acquired = true + p = take!(phase_pool) + Threads.atomic_sub!(phase_remaining, 1) + + done && return + + test_t0 = Base.@lock test_lock begin + test_t0 = time() + running_tests[test] = test_t0 end - ex - end - test_t1 = time() - output = Base.@lock wrkr.io_lock String(take!(wrkr.io)) - Base.@lock results_lock push!(results, (; test, result, output, test_t0, test_t1)) - - # act on the results - if result isa AbstractTestRecord - put!(printer_channel, (:finished, test, worker_id(wrkr), result)) - if anynonpass(result[]) && args.quickfail !== nothing - stop_work() - return + # pass in init_worker_code to custom worker function if defined + wrkr = if init_worker_code == :() + test_worker(test) + else + test_worker(test, init_worker_code) end - - if memory_usage(result) > max_worker_rss - # the worker has reached the max-rss limit, recycle it - # so future tests start with a smaller working set - Malt.stop(wrkr) + if wrkr === nothing + wrkr = p end - else - # One of Malt.TerminatedWorkerException, Malt.RemoteException, or ErrorException - @assert result isa Exception - put!(printer_channel, (:crashed, test, worker_id(wrkr))) - if args.quickfail !== nothing - stop_work() - return + # if a worker failed, spawn a new one + if wrkr === nothing || !Malt.isrunning(wrkr) + wrkr = p = addworker(; init_worker_code, io_ctx.color) end - # the worker encountered some serious failure, recycle it - Malt.stop(wrkr) - end + # run the test + put!(printer_channel, (:started, test, worker_id(wrkr))) + result = try + Malt.remote_eval_wait(Main, wrkr.w, :(import ParallelTestRunner)) + Malt.remote_call_fetch(invokelatest, wrkr.w, runtest, + testsuite[test], test, init_code, test_t0) + catch ex + if isa(ex, InterruptException) + # the worker got interrupted, signal other tasks to stop + stop_work() + return + end + + ex + end + test_t1 = time() + output = Base.@lock wrkr.io_lock String(take!(wrkr.io)) + Base.@lock results_lock push!(results, (; test, result, output, test_t0, test_t1)) + + # act on the results + if result isa AbstractTestRecord + put!(printer_channel, (:finished, test, worker_id(wrkr), result)) + if anynonpass(result[]) && args.quickfail !== nothing + stop_work() + return + end + + if memory_usage(result) > max_worker_rss + # the worker has reached the max-rss limit, recycle it + # so future tests start with a smaller working set + Malt.stop(wrkr) + end + else + # One of Malt.TerminatedWorkerException, Malt.RemoteException, or ErrorException + @assert result isa Exception + put!(printer_channel, (:crashed, test, worker_id(wrkr))) + if args.quickfail !== nothing + stop_work() + return + end + + # the worker encountered some serious failure, recycle it + Malt.stop(wrkr) + end - # get rid of the custom worker - if wrkr != p - Malt.stop(wrkr) - end + # get rid of the custom worker + if wrkr != p + Malt.stop(wrkr) + end - Base.@lock test_lock begin - delete!(running_tests, test) - end - catch ex - isa(ex, InterruptException) || rethrow() - finally - if acquired - # stop the worker if no more tests will need one from the pool - if tests_to_start[] == 0 && p !== nothing && Malt.isrunning(p) - Malt.stop(p) - p = nothing + Base.@lock test_lock begin + delete!(running_tests, test) + end + catch ex + isa(ex, InterruptException) || rethrow() + finally + if acquired + # stop the worker if no more tests will need one from the pool + if phase_remaining[] == 0 && p !== nothing && Malt.isrunning(p) + Malt.stop(p) + p = nothing + end + put!(phase_pool, p) + Base.release(phase_sem) end - put!(worker_pool, p) - Base.release(sem) end - end - end) + end) + end + catch err + if !(err isa InterruptException) + println(io_ctx.stderr, "\nCaught an error, stopping...") + end end - catch err - if !(err isa InterruptException) - println(io_ctx.stderr, "\nCaught an error, stopping...") + + # clean up remaining workers in this phase's pool + close(phase_pool) + for p in phase_pool + if p !== nothing && Malt.isrunning(p) + Malt.stop(p) + end + end + end + + try + if serial_position == :before + run_phase(serial_tests, 1) + run_phase(parallel_tests, jobs) + else + run_phase(parallel_tests, jobs) + run_phase(serial_tests, 1) end finally stop_work() @@ -1157,14 +1227,6 @@ function runtests(mod::Module, args::ParsedArgs; end end - # clean up remaining workers in the pool - close(worker_pool) - for p in worker_pool - if p !== nothing && Malt.isrunning(p) - Malt.stop(p) - end - end - # print the output generated by each testset for (testname, result, output, _start, _stop) in results if !isempty(output) diff --git a/test/runtests.jl b/test/runtests.jl index 975c3fa..dad87d3 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -802,4 +802,140 @@ end @test contains(str, "SUCCESS") end +# ── Serial tests ───────────────────────────────────────────────────────────── + +@testset "partition_tests" begin + @testset "basic partitioning preserves order" begin + tests = ["a", "b", "c", "d", "e"] + serial, parallel = partition_tests(tests, ["c", "a"]) + @test serial == ["a", "c"] + @test parallel == ["b", "d", "e"] + end + + @testset "empty serial list" begin + tests = ["x", "y", "z"] + serial, parallel = partition_tests(tests, String[]) + @test isempty(serial) + @test parallel == tests + end + + @testset "all tests serial" begin + tests = ["a", "b"] + serial, parallel = partition_tests(tests, ["a", "b"]) + @test serial == ["a", "b"] + @test isempty(parallel) + end + + @testset "unknown serial name throws" begin + tests = ["a", "b"] + @test_throws ArgumentError partition_tests(tests, ["a", "missing"]) + end +end + +@testset "serial tests run before parallel (default)" begin + testsuite = Dict( + "serial_a" => quote + @test true + end, + "serial_b" => quote + @test true + end, + "parallel_1" => quote + @test true + end, + "parallel_2" => quote + @test true + end, + ) + io = IOBuffer() + runtests(ParallelTestRunner, ["--jobs=2", "--verbose"]; + testsuite, stdout=io, stderr=io, + serial=["serial_a", "serial_b"]) + str = String(take!(io)) + @test contains(str, "2 serial test(s) will run before") + @test contains(str, "SUCCESS") +end + +@testset "serial tests run after parallel" begin + testsuite = Dict( + "serial_x" => quote + @test true + end, + "parallel_y" => quote + @test true + end, + ) + io = IOBuffer() + runtests(ParallelTestRunner, ["--jobs=2", "--verbose"]; + testsuite, stdout=io, stderr=io, + serial=["serial_x"], serial_position=:after) + str = String(take!(io)) + @test contains(str, "1 serial test(s) will run after") + @test contains(str, "SUCCESS") +end + +@testset "serial_position validation" begin + testsuite = Dict("a" => :(@test true)) + io = IOBuffer() + @test_throws ArgumentError runtests(ParallelTestRunner, String[]; + testsuite, stdout=io, stderr=io, + serial_position=:middle) +end + +@testset "serial tests actually run sequentially" begin + testsuite = Dict( + "s1" => quote + sleep(0.5) + @test true + end, + "s2" => quote + sleep(0.5) + @test true + end, + "p1" => quote + @test true + end, + ) + io = IOBuffer() + ioc = IOContext(io, :color => true) + t0 = time() + runtests(ParallelTestRunner, ["--jobs=4", "--verbose"]; + testsuite, stdout=ioc, stderr=ioc, + serial=["s1", "s2"]) + elapsed = time() - t0 + str = String(take!(io)) + @test contains(str, "SUCCESS") + # Serial tests sleeping 0.5s each should take >= 1s total (sequential), + # not ~0.5s (parallel). Allow some slack for worker startup. + @test elapsed >= 0.8 +end + +@testset "empty serial list is a no-op" begin + testsuite = Dict( + "a" => :(@test true), + "b" => :(@test true), + ) + io = IOBuffer() + runtests(ParallelTestRunner, ["--jobs=2"]; testsuite, stdout=io, stderr=io, + serial=String[]) + str = String(take!(io)) + @test !contains(str, "serial") + @test contains(str, "SUCCESS") +end + +@testset "serial names filtered by positional args" begin + testsuite = Dict( + "unit/a" => :(@test true), + "unit/b" => :(@test true), + "integration/c" => :(@test true), + ) + io = IOBuffer() + runtests(ParallelTestRunner, ["unit"]; testsuite, stdout=io, stderr=io, + serial=["unit/a", "integration/c"]) + str = String(take!(io)) + @test contains(str, "Running 2 tests") + @test contains(str, "1 serial test(s)") + @test contains(str, "SUCCESS") +end + end From b974dc87fd26524a61bca8dbc7948a508102d014 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mos=C3=A8=20Giordano?= Date: Thu, 2 Apr 2026 20:17:41 +0200 Subject: [PATCH 02/15] Improve tests for serial jobs --- test/runtests.jl | 257 +++++++++++++++++++++++++---------------------- 1 file changed, 135 insertions(+), 122 deletions(-) diff --git a/test/runtests.jl b/test/runtests.jl index dad87d3..d9ed27a 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -804,138 +804,151 @@ end # ── Serial tests ───────────────────────────────────────────────────────────── -@testset "partition_tests" begin - @testset "basic partitioning preserves order" begin - tests = ["a", "b", "c", "d", "e"] - serial, parallel = partition_tests(tests, ["c", "a"]) - @test serial == ["a", "c"] - @test parallel == ["b", "d", "e"] - end +@testset "serial tests" begin + @testset "partition_tests" begin + @testset "basic partitioning preserves order" begin + tests = ["a", "b", "c", "d", "e"] + serial, parallel = partition_tests(tests, ["c", "a"]) + @test serial == ["a", "c"] + @test parallel == ["b", "d", "e"] + end - @testset "empty serial list" begin - tests = ["x", "y", "z"] - serial, parallel = partition_tests(tests, String[]) - @test isempty(serial) - @test parallel == tests - end + @testset "empty serial list" begin + tests = ["x", "y", "z"] + serial, parallel = partition_tests(tests, String[]) + @test isempty(serial) + @test parallel == tests + end - @testset "all tests serial" begin - tests = ["a", "b"] - serial, parallel = partition_tests(tests, ["a", "b"]) - @test serial == ["a", "b"] - @test isempty(parallel) - end + @testset "all tests serial" begin + tests = ["a", "b"] + serial, parallel = partition_tests(tests, ["a", "b"]) + @test serial == ["a", "b"] + @test isempty(parallel) + end - @testset "unknown serial name throws" begin - tests = ["a", "b"] - @test_throws ArgumentError partition_tests(tests, ["a", "missing"]) + @testset "unknown serial name throws" begin + tests = ["a", "b"] + @test_throws ArgumentError partition_tests(tests, ["a", "missing"]) + end end -end -@testset "serial tests run before parallel (default)" begin - testsuite = Dict( - "serial_a" => quote - @test true - end, - "serial_b" => quote - @test true - end, - "parallel_1" => quote - @test true - end, - "parallel_2" => quote - @test true - end, - ) - io = IOBuffer() - runtests(ParallelTestRunner, ["--jobs=2", "--verbose"]; - testsuite, stdout=io, stderr=io, - serial=["serial_a", "serial_b"]) - str = String(take!(io)) - @test contains(str, "2 serial test(s) will run before") - @test contains(str, "SUCCESS") -end + @testset "serial tests run before parallel (default)" begin + testsuite = Dict( + "serial_a" => quote + @test true + end, + "serial_b" => quote + @test true + end, + "parallel_1" => quote + @test true + end, + "parallel_2" => quote + @test true + end, + ) + io = IOBuffer() + runtests(ParallelTestRunner, ["--jobs=2", "--verbose"]; + testsuite, stdout=io, stderr=io, + serial=["serial_a", "serial_b"]) + str = String(take!(io)) + @test contains(str, "2 serial test(s) will run before") + @test contains(str, "SUCCESS") + end -@testset "serial tests run after parallel" begin - testsuite = Dict( - "serial_x" => quote - @test true - end, - "parallel_y" => quote - @test true - end, - ) - io = IOBuffer() - runtests(ParallelTestRunner, ["--jobs=2", "--verbose"]; - testsuite, stdout=io, stderr=io, - serial=["serial_x"], serial_position=:after) - str = String(take!(io)) - @test contains(str, "1 serial test(s) will run after") - @test contains(str, "SUCCESS") -end + @testset "serial tests run after parallel" begin + testsuite = Dict( + "serial_x" => quote + @test true + end, + "parallel_y" => quote + @test true + end, + ) + io = IOBuffer() + runtests(ParallelTestRunner, ["--jobs=2", "--verbose"]; + testsuite, stdout=io, stderr=io, + serial=["serial_x"], serial_position=:after) + str = String(take!(io)) + @test contains(str, "1 serial test(s) will run after") + @test contains(str, "SUCCESS") + end -@testset "serial_position validation" begin - testsuite = Dict("a" => :(@test true)) - io = IOBuffer() - @test_throws ArgumentError runtests(ParallelTestRunner, String[]; - testsuite, stdout=io, stderr=io, - serial_position=:middle) -end + @testset "serial_position validation" begin + testsuite = Dict("a" => :(@test true)) + io = IOBuffer() + @test_throws ArgumentError runtests(ParallelTestRunner, String[]; + testsuite, stdout=io, stderr=io, + serial_position=:middle) + end -@testset "serial tests actually run sequentially" begin - testsuite = Dict( - "s1" => quote - sleep(0.5) + @testset "serial tests actually run sequentially" begin + serial_test_body = quote + sleep(1.0) @test true - end, - "s2" => quote - sleep(0.5) - @test true - end, - "p1" => quote - @test true - end, - ) - io = IOBuffer() - ioc = IOContext(io, :color => true) - t0 = time() - runtests(ParallelTestRunner, ["--jobs=4", "--verbose"]; - testsuite, stdout=ioc, stderr=ioc, - serial=["s1", "s2"]) - elapsed = time() - t0 - str = String(take!(io)) - @test contains(str, "SUCCESS") - # Serial tests sleeping 0.5s each should take >= 1s total (sequential), - # not ~0.5s (parallel). Allow some slack for worker startup. - @test elapsed >= 0.8 -end - -@testset "empty serial list is a no-op" begin - testsuite = Dict( - "a" => :(@test true), - "b" => :(@test true), - ) - io = IOBuffer() - runtests(ParallelTestRunner, ["--jobs=2"]; testsuite, stdout=io, stderr=io, - serial=String[]) - str = String(take!(io)) - @test !contains(str, "serial") - @test contains(str, "SUCCESS") -end + end -@testset "serial names filtered by positional args" begin - testsuite = Dict( - "unit/a" => :(@test true), - "unit/b" => :(@test true), - "integration/c" => :(@test true), - ) - io = IOBuffer() - runtests(ParallelTestRunner, ["unit"]; testsuite, stdout=io, stderr=io, - serial=["unit/a", "integration/c"]) - str = String(take!(io)) - @test contains(str, "Running 2 tests") - @test contains(str, "1 serial test(s)") - @test contains(str, "SUCCESS") + testsuite = Dict( + "s1" => serial_test_body, + "s2" => serial_test_body, + "s3" => serial_test_body, + "p1" => quote + @test true + end, + "p2" => quote + @test true + end, + ) + io = IOBuffer() + ioc = IOContext(io, :color => true) + elapsed = try + @elapsed runtests(ParallelTestRunner, ["--jobs=2", "--verbose"]; + testsuite, stdout=ioc, stderr=ioc, + init_code=:(include($(joinpath(@__DIR__, "utils.jl")))), + serial=["s1", "s2", "s3"]) + catch + # Show output in case of failure, to help debugging. + output = String(take!(io)) + printstyled(stderr, "Output of failed test >>>>>>>>>>>>>>>>>>>>\n", color=:red, bold=true) + println(stderr, output) + printstyled(stderr, "End of output <<<<<<<<<<<<<<<<<<<<<<<<<<<<\n", color=:red, bold=true) + rethrow() + end + str = String(take!(io)) + @test contains(str, "SUCCESS") + # Serial tests sleeping 1.0s each should take >= 3s total (sequential), + # not ~1.0s (parallel). + @test elapsed >= 3.0 + end + + @testset "empty serial list is a no-op" begin + testsuite = Dict( + "a" => :(@test true), + "b" => :(@test true), + ) + io = IOBuffer() + runtests(ParallelTestRunner, ["--jobs=2"]; testsuite, stdout=io, stderr=io, + serial=String[]) + str = String(take!(io)) + @test !contains(str, "serial") + @test contains(str, "SUCCESS") + end + + @testset "serial names filtered by positional args" begin + testsuite = Dict( + "unit/a" => :(@test true), + "unit/b" => :(@test true), + "integration/c" => :(@test true), + ) + io = IOBuffer() + runtests(ParallelTestRunner, ["unit"]; testsuite, stdout=io, stderr=io, + serial=["unit/a", "integration/c"]) + str = String(take!(io)) + @test contains(str, "Running 2 tests") + @test contains(str, "1 serial test(s)") + @test contains(str, "SUCCESS") + end end end From 46c1afcff6bf6d663256fb5f681f573344b6171f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mos=C3=A8=20Giordano?= Date: Thu, 2 Apr 2026 20:43:56 +0200 Subject: [PATCH 03/15] Simplify implementation --- src/ParallelTestRunner.jl | 253 ++++++++++++++++++-------------------- test/runtests.jl | 8 +- 2 files changed, 126 insertions(+), 135 deletions(-) diff --git a/src/ParallelTestRunner.jl b/src/ParallelTestRunner.jl index 38baf64..4baea8f 100644 --- a/src/ParallelTestRunner.jl +++ b/src/ParallelTestRunner.jl @@ -1,6 +1,6 @@ module ParallelTestRunner -export runtests, addworkers, addworker, find_tests, parse_args, filter_tests!, partition_tests +export runtests, addworkers, addworker, find_tests, parse_args, filter_tests! using Malt using Dates @@ -869,6 +869,10 @@ function runtests(mod::Module, args::ParsedArgs; # determine parallelism jobs = something(args.jobs, default_njobs()) jobs = clamp(jobs, 1, length(tests)) + worker_pool = Channel{Union{Nothing, PTRWorker}}(jobs) + for _ in 1:jobs + put!(worker_pool, nothing) + end println(stdout, "Running $(length(tests)) tests using $jobs parallel jobs. If this is too many concurrent jobs, specify the `--jobs=N` argument to the tests, or set the `JULIA_CPU_THREADS` environment variable.") if !isempty(serial_tests) println(stdout, " $(length(serial_tests)) serial test(s) will run $(serial_position) the parallel batch.") @@ -883,6 +887,12 @@ function runtests(mod::Module, args::ParsedArgs; worker_tasks = Task[] + tests_semaphores = if serial_position === :before + ((serial_tests, Base.Semaphore(1)), (parallel_tests, Base.Semaphore(max(1, jobs)))) + else + ((parallel_tests, Base.Semaphore(max(1, jobs))), (serial_tests, Base.Semaphore(1))) + end + done = false function stop_work() if !done @@ -1067,139 +1077,112 @@ function runtests(mod::Module, args::ParsedArgs; # execution # - function run_phase(phase_tests, phase_jobs) - isempty(phase_tests) && return - done && return - - phase_sem = Base.Semaphore(max(1, phase_jobs)) - phase_pool = Channel{Union{Nothing, PTRWorker}}(phase_jobs) - for _ in 1:phase_jobs - put!(phase_pool, nothing) - end - phase_remaining = Threads.Atomic{Int}(length(phase_tests)) - - try - @sync for test in phase_tests + tests_to_start = Threads.Atomic{Int}(length(tests)) + try + for (tests, sem) in tests_semaphores + @sync for test in tests push!(worker_tasks, Threads.@spawn begin - local p = nothing - acquired = false - try - Base.acquire(phase_sem) - acquired = true - p = take!(phase_pool) - Threads.atomic_sub!(phase_remaining, 1) - - done && return - - test_t0 = Base.@lock test_lock begin - test_t0 = time() - running_tests[test] = test_t0 - end - - # pass in init_worker_code to custom worker function if defined - wrkr = if init_worker_code == :() - test_worker(test) - else - test_worker(test, init_worker_code) - end - if wrkr === nothing - wrkr = p - end - # if a worker failed, spawn a new one - if wrkr === nothing || !Malt.isrunning(wrkr) - wrkr = p = addworker(; init_worker_code, io_ctx.color) - end - - # run the test - put!(printer_channel, (:started, test, worker_id(wrkr))) - result = try - Malt.remote_eval_wait(Main, wrkr.w, :(import ParallelTestRunner)) - Malt.remote_call_fetch(invokelatest, wrkr.w, runtest, - testsuite[test], test, init_code, test_t0) - catch ex - if isa(ex, InterruptException) - # the worker got interrupted, signal other tasks to stop - stop_work() - return - end - - ex - end - test_t1 = time() - output = Base.@lock wrkr.io_lock String(take!(wrkr.io)) - Base.@lock results_lock push!(results, (; test, result, output, test_t0, test_t1)) - - # act on the results - if result isa AbstractTestRecord - put!(printer_channel, (:finished, test, worker_id(wrkr), result)) - if anynonpass(result[]) && args.quickfail !== nothing - stop_work() - return - end - - if memory_usage(result) > max_worker_rss - # the worker has reached the max-rss limit, recycle it - # so future tests start with a smaller working set - Malt.stop(wrkr) - end - else - # One of Malt.TerminatedWorkerException, Malt.RemoteException, or ErrorException - @assert result isa Exception - put!(printer_channel, (:crashed, test, worker_id(wrkr))) - if args.quickfail !== nothing - stop_work() - return - end - - # the worker encountered some serious failure, recycle it - Malt.stop(wrkr) - end - - # get rid of the custom worker - if wrkr != p - Malt.stop(wrkr) - end - - Base.@lock test_lock begin - delete!(running_tests, test) - end - catch ex - isa(ex, InterruptException) || rethrow() - finally - if acquired - # stop the worker if no more tests will need one from the pool - if phase_remaining[] == 0 && p !== nothing && Malt.isrunning(p) - Malt.stop(p) - p = nothing - end - put!(phase_pool, p) - Base.release(phase_sem) - end - end - end) - end - catch err - if !(err isa InterruptException) - println(io_ctx.stderr, "\nCaught an error, stopping...") - end - end - - # clean up remaining workers in this phase's pool - close(phase_pool) - for p in phase_pool - if p !== nothing && Malt.isrunning(p) - Malt.stop(p) + local p = nothing + acquired = false + try + Base.acquire(sem) + acquired = true + p = take!(worker_pool) + Threads.atomic_sub!(tests_to_start, 1) + + done && return + + test_t0 = Base.@lock test_lock begin + test_t0 = time() + running_tests[test] = test_t0 + end + + # pass in init_worker_code to custom worker function if defined + wrkr = if init_worker_code == :() + test_worker(test) + else + test_worker(test, init_worker_code) + end + if wrkr === nothing + wrkr = p + end + # if a worker failed, spawn a new one + if wrkr === nothing || !Malt.isrunning(wrkr) + wrkr = p = addworker(; init_worker_code, io_ctx.color) + end + + # run the test + put!(printer_channel, (:started, test, worker_id(wrkr))) + result = try + Malt.remote_eval_wait(Main, wrkr.w, :(import ParallelTestRunner)) + Malt.remote_call_fetch(invokelatest, wrkr.w, runtest, + testsuite[test], test, init_code, test_t0) + catch ex + if isa(ex, InterruptException) + # the worker got interrupted, signal other tasks to stop + stop_work() + return + end + + ex + end + test_t1 = time() + output = Base.@lock wrkr.io_lock String(take!(wrkr.io)) + Base.@lock results_lock push!(results, (; test, result, output, test_t0, test_t1)) + + # act on the results + if result isa AbstractTestRecord + put!(printer_channel, (:finished, test, worker_id(wrkr), result)) + if anynonpass(result[]) && args.quickfail !== nothing + stop_work() + return + end + + if memory_usage(result) > max_worker_rss + # the worker has reached the max-rss limit, recycle it + # so future tests start with a smaller working set + Malt.stop(wrkr) + end + else + # One of Malt.TerminatedWorkerException, Malt.RemoteException, or ErrorException + @assert result isa Exception + put!(printer_channel, (:crashed, test, worker_id(wrkr))) + if args.quickfail !== nothing + stop_work() + return + end + + # the worker encountered some serious failure, recycle it + Malt.stop(wrkr) + end + + # get rid of the custom worker + if wrkr != p + Malt.stop(wrkr) + end + + Base.@lock test_lock begin + delete!(running_tests, test) + end + catch ex + isa(ex, InterruptException) || rethrow() + finally + if acquired + # stop the worker if no more tests will need one from the pool + if tests_to_start[] == 0 && p !== nothing && Malt.isrunning(p) + Malt.stop(p) + p = nothing + end + put!(worker_pool, p) + Base.release(sem) + end + end + end) end end - end - - try - if serial_position == :before - run_phase(serial_tests, 1) - run_phase(parallel_tests, jobs) - else - run_phase(parallel_tests, jobs) - run_phase(serial_tests, 1) + catch err + if !(err isa InterruptException) + println(io_ctx.stderr, "\nCaught an error, stopping...") end finally stop_work() @@ -1227,6 +1210,14 @@ function runtests(mod::Module, args::ParsedArgs; end end + # clean up remaining workers in the pool + close(worker_pool) + for p in worker_pool + if p !== nothing && Malt.isrunning(p) + Malt.stop(p) + end + end + # print the output generated by each testset for (testname, result, output, _start, _stop) in results if !isempty(output) diff --git a/test/runtests.jl b/test/runtests.jl index d9ed27a..90ddfbb 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -808,28 +808,28 @@ end @testset "partition_tests" begin @testset "basic partitioning preserves order" begin tests = ["a", "b", "c", "d", "e"] - serial, parallel = partition_tests(tests, ["c", "a"]) + serial, parallel = ParallelTestRunner.partition_tests(tests, ["c", "a"]) @test serial == ["a", "c"] @test parallel == ["b", "d", "e"] end @testset "empty serial list" begin tests = ["x", "y", "z"] - serial, parallel = partition_tests(tests, String[]) + serial, parallel = ParallelTestRunner.partition_tests(tests, String[]) @test isempty(serial) @test parallel == tests end @testset "all tests serial" begin tests = ["a", "b"] - serial, parallel = partition_tests(tests, ["a", "b"]) + serial, parallel = ParallelTestRunner.partition_tests(tests, ["a", "b"]) @test serial == ["a", "b"] @test isempty(parallel) end @testset "unknown serial name throws" begin tests = ["a", "b"] - @test_throws ArgumentError partition_tests(tests, ["a", "missing"]) + @test_throws ArgumentError ParallelTestRunner.partition_tests(tests, ["a", "missing"]) end end From 0af9ec81e454aa6b4144fe4bcfed3557c6a5538e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mos=C3=A8=20Giordano?= Date: Thu, 2 Apr 2026 23:09:02 +0200 Subject: [PATCH 04/15] Add documentation for serial execution Co-authored-by: Claude --- docs/src/advanced.md | 49 ++++++++++++++++++++++++++++++++++++++++++++ docs/src/api.md | 5 +++-- docs/src/index.md | 7 +++++++ 3 files changed, 59 insertions(+), 2 deletions(-) diff --git a/docs/src/advanced.md b/docs/src/advanced.md index 5051775..98cbbc5 100644 --- a/docs/src/advanced.md +++ b/docs/src/advanced.md @@ -127,6 +127,53 @@ end # hide ``` The `init_worker_code` is evaluated once per worker, so all definitions can be imported for use by the test module. +## Serial Tests + +Some tests cannot safely run in parallel with other tests — for example, tests that allocate very large arrays and would exhaust memory if multiple ran simultaneously. +The `serial` keyword argument to [`runtests`](@ref) lets you designate specific tests to run one at a time, while the remaining tests still run in parallel. + +```@example mypackage +using ParallelTestRunner +using MyPackage + +testsuite = Dict( + "big_alloc" => quote + # This test allocates ~4 GB and should not overlap with other tests + @test true + end, + "huge_matrix" => quote + @test true + end, + "fast_unit" => quote + @test 1 + 1 == 2 + end, + "fast_integration" => quote + @test true + end, +) + +# "big_alloc" and "huge_matrix" run one at a time; the rest run in parallel +runtests(MyPackage, ARGS; testsuite, serial=["big_alloc", "huge_matrix"]) +``` + +By default serial tests run **before** the parallel batch. +Use `serial_position=:after` to run them after instead: + +```@example mypackage +runtests(MyPackage, ARGS; testsuite, serial=["big_alloc", "huge_matrix"], serial_position=:after) +``` + +Serial tests participate in the same ordering logic as parallel tests (sorted by historical +duration, longest first) and their results appear in the same overall summary. + +!!! tip + With automatic test discovery via [`find_tests`](@ref), the `serial` names are the same + keys that appear in the testsuite dictionary (e.g. `"subdir/memory_test"`). + +!!! note + If the user filters tests via positional arguments (e.g. `julia test/runtests.jl unit`), + any serial test names that were filtered out are silently removed from the serial list. + ## Custom Workers For tests that require specific environment variables or Julia flags, you can use the `test_worker` keyword argument to [`runtests`](@ref) to assign tests to custom workers: @@ -254,3 +301,5 @@ function jltest { Having few long-running test files and other short-running ones hinders scalability. 1. **Use custom workers sparingly**: Custom workers add overhead. Only use them when tests genuinely require different configurations. + +1. **Use `serial` for resource-intensive tests**: If a test allocates significant memory or uses exclusive hardware resources, mark it as serial rather than reducing `--jobs` globally. This keeps the rest of your suite running in parallel. diff --git a/docs/src/api.md b/docs/src/api.md index 9890f75..c7ade69 100644 --- a/docs/src/api.md +++ b/docs/src/api.md @@ -39,12 +39,13 @@ addworkers default_njobs ``` -## Internal Types +## Internal Functionalities -These are internal types, not subject to semantic versioning contract (could be changed or removed at any point without notice), not intended for consumption by end-users. +These are internal types or functions, not subject to semantic versioning contract (could be changed or removed at any point without notice), not intended for consumption by end-users. They are documented here exclusively for `ParallelTestRunner` developers and contributors. ```@docs ParsedArgs WorkerTestSet +partition_tests ``` diff --git a/docs/src/index.md b/docs/src/index.md index 65349d6..5b83e30 100644 --- a/docs/src/index.md +++ b/docs/src/index.md @@ -107,6 +107,13 @@ Pkg.test("MyPackage"; test_args=`--verbose --jobs=4 integration`) Tests run concurrently in isolated worker processes, each inside own module. `ParallelTestRunner` records historical tests duration for each package, so that in subsequent runs long-running tests are executed first, to improve load balancing. +### Serial Test Support + +Certain tests (e.g. memory-hungry tests) may need to run one at a time. +The `serial` keyword argument to [`runtests`](@ref) lets you designate specific tests +for sequential execution, either before or after the parallel batch. +See [Serial Tests](@ref) in the advanced usage guide for details. + ### Real-time Progress The test runner provides real-time output showing: From c25ed2986b48ed9a2a7e7832b7475752ae6f9589 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mos=C3=A8=20Giordano?= Date: Thu, 2 Apr 2026 23:53:51 +0200 Subject: [PATCH 05/15] [docs] Use `--verbose` in serial tests examples --- docs/src/advanced.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/src/advanced.md b/docs/src/advanced.md index 98cbbc5..2ac2e93 100644 --- a/docs/src/advanced.md +++ b/docs/src/advanced.md @@ -153,14 +153,14 @@ testsuite = Dict( ) # "big_alloc" and "huge_matrix" run one at a time; the rest run in parallel -runtests(MyPackage, ARGS; testsuite, serial=["big_alloc", "huge_matrix"]) +runtests(MyPackage, ["--verbose"]; testsuite, serial=["big_alloc", "huge_matrix"]) ``` By default serial tests run **before** the parallel batch. Use `serial_position=:after` to run them after instead: ```@example mypackage -runtests(MyPackage, ARGS; testsuite, serial=["big_alloc", "huge_matrix"], serial_position=:after) +runtests(MyPackage, ["--verbose"]; testsuite, serial=["big_alloc", "huge_matrix"], serial_position=:after) ``` Serial tests participate in the same ordering logic as parallel tests (sorted by historical From aa83bc3b0137fd37e06023fd64e0f69082fe17f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mos=C3=A8=20Giordano?= Date: Fri, 3 Apr 2026 00:04:47 +0200 Subject: [PATCH 06/15] Cap number of parallel jobs to number of parallel tests --- src/ParallelTestRunner.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ParallelTestRunner.jl b/src/ParallelTestRunner.jl index 4baea8f..46119ab 100644 --- a/src/ParallelTestRunner.jl +++ b/src/ParallelTestRunner.jl @@ -868,7 +868,7 @@ function runtests(mod::Module, args::ParsedArgs; # determine parallelism jobs = something(args.jobs, default_njobs()) - jobs = clamp(jobs, 1, length(tests)) + jobs = clamp(jobs, 1, length(parallel_tests)) worker_pool = Channel{Union{Nothing, PTRWorker}}(jobs) for _ in 1:jobs put!(worker_pool, nothing) From 7dc58346c6df0f72fa902778ce0978317a4c2671 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mos=C3=A8=20Giordano?= Date: Fri, 3 Apr 2026 00:05:21 +0200 Subject: [PATCH 07/15] Add some more tests for serial jobs --- test/runtests.jl | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/test/runtests.jl b/test/runtests.jl index 90ddfbb..ae817eb 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -849,12 +849,15 @@ end end, ) io = IOBuffer() - runtests(ParallelTestRunner, ["--jobs=2", "--verbose"]; + jobs = 2 + old_id_counter = ParallelTestRunner.ID_COUNTER[] + runtests(ParallelTestRunner, ["--jobs=$(jobs)", "--verbose"]; testsuite, stdout=io, stderr=io, serial=["serial_a", "serial_b"]) str = String(take!(io)) @test contains(str, "2 serial test(s) will run before") @test contains(str, "SUCCESS") + @test ParallelTestRunner.ID_COUNTER[] == old_id_counter + jobs end @testset "serial tests run after parallel" begin @@ -902,8 +905,10 @@ end ) io = IOBuffer() ioc = IOContext(io, :color => true) + old_id_counter = ParallelTestRunner.ID_COUNTER[] + jobs = 2 elapsed = try - @elapsed runtests(ParallelTestRunner, ["--jobs=2", "--verbose"]; + @elapsed runtests(ParallelTestRunner, ["--jobs=$(jobs)", "--verbose"]; testsuite, stdout=ioc, stderr=ioc, init_code=:(include($(joinpath(@__DIR__, "utils.jl")))), serial=["s1", "s2", "s3"]) @@ -917,6 +922,7 @@ end end str = String(take!(io)) @test contains(str, "SUCCESS") + @test ParallelTestRunner.ID_COUNTER[] == old_id_counter + jobs # Serial tests sleeping 1.0s each should take >= 3s total (sequential), # not ~1.0s (parallel). @test elapsed >= 3.0 From f6f0796c5d1071dc3d261f5cb00f843e7a9e8ce6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mos=C3=A8=20Giordano?= Date: Fri, 3 Apr 2026 16:53:07 +0200 Subject: [PATCH 08/15] Add test for case of parallel tests less than requested jobs --- test/runtests.jl | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/test/runtests.jl b/test/runtests.jl index ae817eb..6dd137d 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -941,6 +941,26 @@ end @test contains(str, "SUCCESS") end + @testset "parallel tests less than requested jobs" begin + testsuite = Dict( + "s1" => :(), + "s2" => :(), + "p1" => :(), + "p2" => :(), + ); + io = IOBuffer() + old_id_counter = ParallelTestRunner.ID_COUNTER[] + runtests(ParallelTestRunner, ["--jobs=3"]; testsuite, stdout=io, stderr=io, + serial=["s1", "s2"]) + str = String(take!(io)) + # We have 4 total tests, requested 3 jobs, but only 2 tests are run in parallel, so + # 2 is the maximum parallelism we expect, and the number of new workers we spawn. + @test contains(str, "Running 4 tests using 2 parallel jobs") + @test contains(str, "2 serial test(s)") + @test contains(str, "SUCCESS") + @test ParallelTestRunner.ID_COUNTER[] == old_id_counter + 2 + end + @testset "serial names filtered by positional args" begin testsuite = Dict( "unit/a" => :(@test true), From 415de6ba11b8b43869f17bc1ad516111736a7ab9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mos=C3=A8=20Giordano?= Date: Fri, 3 Apr 2026 16:54:54 +0200 Subject: [PATCH 09/15] Improve test for serial names filtered by positional args `integration/c` should not be run. --- test/runtests.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/runtests.jl b/test/runtests.jl index 6dd137d..b727d80 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -965,7 +965,7 @@ end testsuite = Dict( "unit/a" => :(@test true), "unit/b" => :(@test true), - "integration/c" => :(@test true), + "integration/c" => :(@test false), ) io = IOBuffer() runtests(ParallelTestRunner, ["unit"]; testsuite, stdout=io, stderr=io, From 2a9d3e3838b12281b935557ee37b3a5e35e4bf78 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mos=C3=A8=20Giordano?= Date: Fri, 3 Apr 2026 18:01:18 +0200 Subject: [PATCH 10/15] Fix case of all tests serial --- src/ParallelTestRunner.jl | 2 +- test/runtests.jl | 19 +++++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/src/ParallelTestRunner.jl b/src/ParallelTestRunner.jl index 46119ab..0042066 100644 --- a/src/ParallelTestRunner.jl +++ b/src/ParallelTestRunner.jl @@ -868,7 +868,7 @@ function runtests(mod::Module, args::ParsedArgs; # determine parallelism jobs = something(args.jobs, default_njobs()) - jobs = clamp(jobs, 1, length(parallel_tests)) + jobs = clamp(jobs, 1, max(1, length(parallel_tests))) worker_pool = Channel{Union{Nothing, PTRWorker}}(jobs) for _ in 1:jobs put!(worker_pool, nothing) diff --git a/test/runtests.jl b/test/runtests.jl index b727d80..314329b 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -928,6 +928,25 @@ end @test elapsed >= 3.0 end + @testset "all tests serial" begin + testsuite = Dict( + "a" => :(), + "b" => :(), + "c" => :(), + "d" => :(), + ) + io = IOBuffer() + old_id_counter = ParallelTestRunner.ID_COUNTER[] + runtests(ParallelTestRunner, ["--jobs=3", "--verbose"]; + testsuite, stdout=io, stderr=io, + serial=["a", "b", "c", "d"]) + str = String(take!(io)) + @test contains(str, "Running 4 tests using 1 parallel jobs") + @test contains(str, "4 serial test(s) will run before") + @test contains(str, "SUCCESS") + @test ParallelTestRunner.ID_COUNTER[] == old_id_counter + 1 + end + @testset "empty serial list is a no-op" begin testsuite = Dict( "a" => :(@test true), From 3d1a844987007968e2236d3a9a76b74b66ec4d63 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mos=C3=A8=20Giordano?= Date: Fri, 3 Apr 2026 18:44:08 +0200 Subject: [PATCH 11/15] Reuse same worker for all the serial tests Co-authored-by: Claude --- src/ParallelTestRunner.jl | 40 ++++++++++++++++++++++++++++----------- 1 file changed, 29 insertions(+), 11 deletions(-) diff --git a/src/ParallelTestRunner.jl b/src/ParallelTestRunner.jl index 0042066..a6390ed 100644 --- a/src/ParallelTestRunner.jl +++ b/src/ParallelTestRunner.jl @@ -887,10 +887,14 @@ function runtests(mod::Module, args::ParsedArgs; worker_tasks = Task[] - tests_semaphores = if serial_position === :before - ((serial_tests, Base.Semaphore(1)), (parallel_tests, Base.Semaphore(max(1, jobs)))) + serial_worker = Ref{Union{Nothing, PTRWorker}}(nothing) + + test_phases = if serial_position === :before + ((serial_tests, Base.Semaphore(1), serial_worker), + (parallel_tests, Base.Semaphore(max(1, jobs)), nothing)) else - ((parallel_tests, Base.Semaphore(max(1, jobs))), (serial_tests, Base.Semaphore(1))) + ((parallel_tests, Base.Semaphore(max(1, jobs)), nothing), + (serial_tests, Base.Semaphore(1), serial_worker)) end done = false @@ -1079,15 +1083,20 @@ function runtests(mod::Module, args::ParsedArgs; tests_to_start = Threads.Atomic{Int}(length(tests)) try - for (tests, sem) in tests_semaphores - @sync for test in tests + for (phase_tests, sem, shared_worker) in test_phases + isempty(phase_tests) && continue + # for serial phases, reserve one pool slot for the shared worker + if !isnothing(shared_worker) + shared_worker[] = take!(worker_pool) + end + @sync for test in phase_tests push!(worker_tasks, Threads.@spawn begin local p = nothing acquired = false try Base.acquire(sem) acquired = true - p = take!(worker_pool) + p = !isnothing(shared_worker) ? shared_worker[] : take!(worker_pool) Threads.atomic_sub!(tests_to_start, 1) done && return @@ -1168,17 +1177,26 @@ function runtests(mod::Module, args::ParsedArgs; isa(ex, InterruptException) || rethrow() finally if acquired - # stop the worker if no more tests will need one from the pool - if tests_to_start[] == 0 && p !== nothing && Malt.isrunning(p) - Malt.stop(p) - p = nothing + if !isnothing(shared_worker) + shared_worker[] = p + else + # stop the worker if no more tests will need one from the pool + if tests_to_start[] == 0 && p !== nothing && Malt.isrunning(p) + Malt.stop(p) + p = nothing + end + put!(worker_pool, p) end - put!(worker_pool, p) Base.release(sem) end end end) end + # return the serial worker to the pool for potential reuse + if !isnothing(shared_worker) + put!(worker_pool, shared_worker[]) + shared_worker[] = nothing + end end catch err if !(err isa InterruptException) From 6e002f33a861eee23762e835f5c486b1f0a77533 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mos=C3=A8=20Giordano?= Date: Fri, 3 Apr 2026 18:44:45 +0200 Subject: [PATCH 12/15] Add tests for checking serial tests run alone --- test/runtests.jl | 49 ++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 43 insertions(+), 6 deletions(-) diff --git a/test/runtests.jl b/test/runtests.jl index 314329b..ab4706e 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -890,18 +890,19 @@ end serial_test_body = quote sleep(1.0) @test true + children = _count_child_pids($(getpid())) + # Make sure serial tests run alone. + if children >= 0 + @test children == 1 + end end testsuite = Dict( "s1" => serial_test_body, "s2" => serial_test_body, "s3" => serial_test_body, - "p1" => quote - @test true - end, - "p2" => quote - @test true - end, + "p1" => :( @test true ), + "p2" => :( @test true ), ) io = IOBuffer() ioc = IOContext(io, :color => true) @@ -994,6 +995,42 @@ end @test contains(str, "1 serial test(s)") @test contains(str, "SUCCESS") end + + @testset "crashing serial test" begin + serial_test_body = quote + children = _count_child_pids($(getpid())) + # Make sure serial tests run alone. + if children >= 0 + @test children == 1 + end + end + + testsuite = Dict( + "s1" => serial_test_body, + "s2" => serial_test_body, + "s3" => serial_test_body, + "s4" => :(ccall(:abort, Nothing, ())), + "p1" => :(), + "p2" => :(), + ) + io = IOBuffer() + ioc = IOContext(io, :color => true) + old_id_counter = ParallelTestRunner.ID_COUNTER[] + jobs = 2 + @test_throws Test.FallbackTestSetException("Test run finished with errors") begin + runtests(ParallelTestRunner, ["--jobs=$(jobs)", "--verbose"]; + testsuite, stdout=ioc, stderr=ioc, + init_code=:(include($(joinpath(@__DIR__, "utils.jl")))), + serial=["s1", "s2", "s3", "s4"]) + end + str = String(take!(io)) + @test contains(str, "Running 6 tests using 2 parallel jobs") + @test contains(str, "4 serial test(s)") + @test contains(str, "FAILURE") + # We'll use jobs + 1 workers because one will crash. + @test ParallelTestRunner.ID_COUNTER[] == old_id_counter + jobs + 1 + end + end end From 0fa5f49e03787e006030cd6b411bafb6ce2fd0d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mos=C3=A8=20Giordano?= Date: Fri, 3 Apr 2026 18:48:00 +0200 Subject: [PATCH 13/15] Slightly simplify some tests --- test/runtests.jl | 42 +++++++++++++++--------------------------- 1 file changed, 15 insertions(+), 27 deletions(-) diff --git a/test/runtests.jl b/test/runtests.jl index ab4706e..b7f8c34 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -706,7 +706,7 @@ end # ── Integration tests ──────────────────────────────────────────────────────── @testset "non-verbose mode" begin - testsuite = Dict("quiet" => quote @test true end) + testsuite = Dict("quiet" => :()) io = IOBuffer() runtests(ParallelTestRunner, String[]; testsuite, stdout=io, stderr=io) str = String(take!(io)) @@ -835,18 +835,10 @@ end @testset "serial tests run before parallel (default)" begin testsuite = Dict( - "serial_a" => quote - @test true - end, - "serial_b" => quote - @test true - end, - "parallel_1" => quote - @test true - end, - "parallel_2" => quote - @test true - end, + "serial_a" => :(), + "serial_b" => :(), + "parallel_1" => :(), + "parallel_2" => :(), ) io = IOBuffer() jobs = 2 @@ -862,12 +854,8 @@ end @testset "serial tests run after parallel" begin testsuite = Dict( - "serial_x" => quote - @test true - end, - "parallel_y" => quote - @test true - end, + "serial_x" => :(), + "parallel_y" => :(), ) io = IOBuffer() runtests(ParallelTestRunner, ["--jobs=2", "--verbose"]; @@ -879,7 +867,7 @@ end end @testset "serial_position validation" begin - testsuite = Dict("a" => :(@test true)) + testsuite = Dict("a" => :()) io = IOBuffer() @test_throws ArgumentError runtests(ParallelTestRunner, String[]; testsuite, stdout=io, stderr=io, @@ -889,7 +877,6 @@ end @testset "serial tests actually run sequentially" begin serial_test_body = quote sleep(1.0) - @test true children = _count_child_pids($(getpid())) # Make sure serial tests run alone. if children >= 0 @@ -901,8 +888,8 @@ end "s1" => serial_test_body, "s2" => serial_test_body, "s3" => serial_test_body, - "p1" => :( @test true ), - "p2" => :( @test true ), + "p1" => :(), + "p2" => :(), ) io = IOBuffer() ioc = IOContext(io, :color => true) @@ -950,8 +937,8 @@ end @testset "empty serial list is a no-op" begin testsuite = Dict( - "a" => :(@test true), - "b" => :(@test true), + "a" => :(), + "b" => :(), ) io = IOBuffer() runtests(ParallelTestRunner, ["--jobs=2"]; testsuite, stdout=io, stderr=io, @@ -983,8 +970,9 @@ end @testset "serial names filtered by positional args" begin testsuite = Dict( - "unit/a" => :(@test true), - "unit/b" => :(@test true), + "unit/a" => :(), + "unit/b" => :(), + # This test file shouldn't called, we use `@test false` to make sure it's not. "integration/c" => :(@test false), ) io = IOBuffer() From 029a2617564db583e0a28e110d9a6aee65612b35 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mos=C3=A8=20Giordano?= Date: Fri, 3 Apr 2026 19:06:45 +0200 Subject: [PATCH 14/15] Also test serial tests run after are alone --- test/runtests.jl | 34 ++++++++++++++++++++++++++++++---- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/test/runtests.jl b/test/runtests.jl index b7f8c34..a168ea0 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,6 +1,21 @@ using ParallelTestRunner using Test +macro show_if_error(io, expr) + quote + try + @elapsed $(esc(expr)) + catch + # Show output in case of failure, to help debugging. + output = String(take!($(esc(io)))) + printstyled(stderr, "Output of failed test >>>>>>>>>>>>>>>>>>>>\n", color=:red, bold=true) + println(stderr, output) + printstyled(stderr, "End of output <<<<<<<<<<<<<<<<<<<<<<<<<<<<\n", color=:red, bold=true) + rethrow() + end + end +end + cd(@__DIR__) include(joinpath(@__DIR__, "utils.jl")) @@ -854,16 +869,27 @@ end @testset "serial tests run after parallel" begin testsuite = Dict( - "serial_x" => :(), + "serial_x" => quote + children = _count_child_pids($(getpid())) + # Make sure serial tests run alone. + if children >= 0 + @test children == 1 + end + end, "parallel_y" => :(), ) io = IOBuffer() - runtests(ParallelTestRunner, ["--jobs=2", "--verbose"]; - testsuite, stdout=io, stderr=io, - serial=["serial_x"], serial_position=:after) + ioc = IOContext(io, :color => true) + old_id_counter = ParallelTestRunner.ID_COUNTER[] + @show_if_error io runtests(ParallelTestRunner, ["--jobs=2", "--verbose"]; + testsuite, stdout=ioc, stderr=ioc, + init_code=:(include($(joinpath(@__DIR__, "utils.jl")))), + serial=["serial_x"], serial_position=:after) str = String(take!(io)) + @test contains(str, "Running 2 tests using 1 parallel jobs") @test contains(str, "1 serial test(s) will run after") @test contains(str, "SUCCESS") + @test ParallelTestRunner.ID_COUNTER[] == old_id_counter + 1 end @testset "serial_position validation" begin From 7b2ea8fcc1e5d9e123d2da7a6180a60f727b16e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mos=C3=A8=20Giordano?= Date: Fri, 3 Apr 2026 19:08:52 +0200 Subject: [PATCH 15/15] Use `@show_if_error` macro in more places --- test/runtests.jl | 32 ++++++++------------------------ 1 file changed, 8 insertions(+), 24 deletions(-) diff --git a/test/runtests.jl b/test/runtests.jl index a168ea0..67bde9c 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,12 +1,12 @@ using ParallelTestRunner using Test +# Helper macro to show output of tests in case they fail. Useful for debugging. macro show_if_error(io, expr) quote try @elapsed $(esc(expr)) catch - # Show output in case of failure, to help debugging. output = String(take!($(esc(io)))) printstyled(stderr, "Output of failed test >>>>>>>>>>>>>>>>>>>>\n", color=:red, bold=true) println(stderr, output) @@ -490,17 +490,8 @@ end njobs = 2 io = IOBuffer() ioc = IOContext(io, :color => true) - try - runtests(ParallelTestRunner, ["--jobs=$(njobs)", "--verbose"]; - testsuite, stdout=ioc, stderr=ioc, init_code=:(include($(joinpath(@__DIR__, "utils.jl"))))) - catch - # Show output in case of failure, to help debugging. - output = String(take!(io)) - printstyled(stderr, "Output of failed test >>>>>>>>>>>>>>>>>>>>\n", color=:red, bold=true) - println(stderr, output) - printstyled(stderr, "End of output <<<<<<<<<<<<<<<<<<<<<<<<<<<<\n", color=:red, bold=true) - rethrow() - end + @show_if_error io runtests(ParallelTestRunner, ["--jobs=$(njobs)", "--verbose"]; + testsuite, stdout=ioc, stderr=ioc, init_code=:(include($(joinpath(@__DIR__, "utils.jl"))))) # Make sure we didn't spawn more workers than expected. @test ParallelTestRunner.ID_COUNTER[] == old_id_counter + njobs # Allow a moment for worker processes to exit @@ -921,18 +912,11 @@ end ioc = IOContext(io, :color => true) old_id_counter = ParallelTestRunner.ID_COUNTER[] jobs = 2 - elapsed = try - @elapsed runtests(ParallelTestRunner, ["--jobs=$(jobs)", "--verbose"]; - testsuite, stdout=ioc, stderr=ioc, - init_code=:(include($(joinpath(@__DIR__, "utils.jl")))), - serial=["s1", "s2", "s3"]) - catch - # Show output in case of failure, to help debugging. - output = String(take!(io)) - printstyled(stderr, "Output of failed test >>>>>>>>>>>>>>>>>>>>\n", color=:red, bold=true) - println(stderr, output) - printstyled(stderr, "End of output <<<<<<<<<<<<<<<<<<<<<<<<<<<<\n", color=:red, bold=true) - rethrow() + elapsed = @elapsed begin + @show_if_error io runtests(ParallelTestRunner, ["--jobs=$(jobs)", "--verbose"]; + testsuite, stdout=ioc, stderr=ioc, + init_code=:(include($(joinpath(@__DIR__, "utils.jl")))), + serial=["s1", "s2", "s3"]) end str = String(take!(io)) @test contains(str, "SUCCESS")