Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ uuid = "fab6aee4-877b-4bac-a744-3eca44acbb6f"
version = "1.2.0"

[deps]
PrecompileTools = "aea7be01-6a6a-4083-8856-8a6e6704d82a"
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
ScopedValues = "7e506255-f358-4e82-b7e4-beb19740aa63"
Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b"
Expand All @@ -19,6 +20,7 @@ Aqua = "0.8"
Distributed = "1"
LibSSH = "0.7"
LinearAlgebra = "1"
PrecompileTools = "1"
Random = "1"
Revise = "3.7.0"
ScopedValues = "1.6.0"
Expand Down
1 change: 1 addition & 0 deletions docs/src/_changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ This documents notable changes in DistributedNext.jl. The format is based on
single struct ([#61]). This should not be a user-visible change, but of course
it's possible that some things slipped through the cracks so please open an
issue if you encounter any bugs.
- A precompilation workload was added to improve TTFX ([#62]).

## [v1.2.0] - 2026-03-21

Expand Down
5 changes: 4 additions & 1 deletion ext/ReviseExt.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ module ReviseExt
import DistributedNext
import DistributedNext: myid, workers, remotecall

import Revise
using PrecompileTools: @recompile_invalidations

# Sadly Revise causes quite a few invalidations. TODO: make DistributedNext more
# resistant to invalidations.
@recompile_invalidations import Revise

struct DistributedNextWorker <: Revise.AbstractWorker
id::Int
Expand Down
51 changes: 41 additions & 10 deletions src/DistributedNext.jl
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ function _check_distributed_active()
return false
end

if isdefined(Base.loaded_modules[distributed_pkgid].LPROC, :cookie) && CTX.inited[]
if isdefined(Base.loaded_modules[distributed_pkgid].LPROC, :cookie) && CTX[].inited[]
@warn "DistributedNext has detected that the Distributed stdlib may be in use. Be aware that these libraries are not compatible, you should use either one or the other."
return true
else
Expand Down Expand Up @@ -123,7 +123,7 @@ Base.lock(l::Lockable) = lock(l.lock)
Base.trylock(l::Lockable) = trylock(l.lock)
Base.unlock(l::Lockable) = unlock(l.lock)

next_ref_id() = Threads.atomic_add!(CTX.ref_id, 1)
next_ref_id() = Threads.atomic_add!(CTX[].ref_id, 1)

struct RRID
whence::Int
Expand All @@ -146,12 +146,11 @@ include("macros.jl") # @spawn and friends
include("workerpool.jl")
include("pmap.jl")
include("managers.jl") # LocalManager and SSHManager
include("precompile.jl")

# Bundles all mutable global state for a distributed cluster into a single
# object. Currently a single global instance (`CTX`) is used, but multiple
# independent clusters could be supported in the future.
@kwdef struct ClusterContext
# object. The active context is accessed via the `CTX` ScopedValue, allowing
# multiple independent clusters to coexist in different task scopes.
@kwdef mutable struct ClusterContext
# Process identity
lproc::LocalProcess = LocalProcess()
role::Ref{Symbol} = Ref{Symbol}(:master)
Expand Down Expand Up @@ -204,11 +203,42 @@ include("precompile.jl")
# Scoped value for exited callback pid
exited_callback_pid::ScopedValue{Int} = ScopedValue(-1)

# GC messages task
shutting_down::Threads.Atomic{Bool} = Threads.Atomic{Bool}(false)
gc_msgs_task::Union{Task, Nothing} = nothing

# Stdlib watcher
stdlib_watcher_timer::Ref{Union{Timer, Nothing}} = Ref{Union{Timer, Nothing}}(nothing)
stdlib_watcher_timer::Union{Timer, Nothing} = nothing
end

function ClusterContext(f::Base.Callable; kwargs...)
ctx = ClusterContext(; kwargs...)
ret = @with CTX => ctx f()
close(ctx)

return ret
end

const CTX = ClusterContext()
function Base.close(ctx::ClusterContext)
ctx.shutting_down[] = true
if !isnothing(ctx.gc_msgs_task)
@lock ctx.any_gc_flag notify(ctx.any_gc_flag)
wait(ctx.gc_msgs_task::Task)
end

if !isnothing(ctx.stdlib_watcher_timer)
close(ctx.stdlib_watcher_timer::Timer)
end

# Close all tracked sockets
@lock ctx.map_sock_wrkr for sock in keys(ctx.map_sock_wrkr[])
close(sock)
end
end

const CTX = ScopedValue(ClusterContext())

include("precompile.jl")

function __init__()
init_parallel()
Expand All @@ -219,13 +249,14 @@ function __init__()
# cluster cookie has been set, which is most likely to have been done
# through Distributed.init_multi() being called by Distributed.addprocs() or
# something.
CTX.stdlib_watcher_timer[] = Timer(0; interval=1) do timer
CTX[].stdlib_watcher_timer = Timer(0; interval=1) do timer
if _check_distributed_active()
close(timer)
end
end
atexit(() -> close(CTX.stdlib_watcher_timer[]))
end

atexit(() -> close(CTX[]))
end

end
Loading
Loading