diff --git a/Project.toml b/Project.toml index f83e756..c5c58c8 100644 --- a/Project.toml +++ b/Project.toml @@ -3,6 +3,7 @@ uuid = "fab6aee4-877b-4bac-a744-3eca44acbb6f" version = "1.2.0" [deps] +PrecompileTools = "aea7be01-6a6a-4083-8856-8a6e6704d82a" Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" ScopedValues = "7e506255-f358-4e82-b7e4-beb19740aa63" Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b" @@ -19,6 +20,7 @@ Aqua = "0.8" Distributed = "1" LibSSH = "0.7" LinearAlgebra = "1" +PrecompileTools = "1" Random = "1" Revise = "3.7.0" ScopedValues = "1.6.0" diff --git a/docs/src/_changelog.md b/docs/src/_changelog.md index 895cb89..478d897 100644 --- a/docs/src/_changelog.md +++ b/docs/src/_changelog.md @@ -14,6 +14,7 @@ This documents notable changes in DistributedNext.jl. The format is based on single struct ([#61]). This should not be a user-visible change, but of course it's possible that some things slipped through the cracks so please open an issue if you encounter any bugs. +- A precompilation workload was added to improve TTFX ([#62]). ## [v1.2.0] - 2026-03-21 diff --git a/ext/ReviseExt.jl b/ext/ReviseExt.jl index 269add5..c41b4a6 100644 --- a/ext/ReviseExt.jl +++ b/ext/ReviseExt.jl @@ -3,8 +3,11 @@ module ReviseExt import DistributedNext import DistributedNext: myid, workers, remotecall -import Revise +using PrecompileTools: @recompile_invalidations +# Sadly Revise causes quite a few invalidations. TODO: make DistributedNext more +# resistant to invalidations. +@recompile_invalidations import Revise struct DistributedNextWorker <: Revise.AbstractWorker id::Int diff --git a/src/DistributedNext.jl b/src/DistributedNext.jl index 8dc7ebe..dcac2cd 100644 --- a/src/DistributedNext.jl +++ b/src/DistributedNext.jl @@ -88,7 +88,7 @@ function _check_distributed_active() return false end - if isdefined(Base.loaded_modules[distributed_pkgid].LPROC, :cookie) && CTX.inited[] + if isdefined(Base.loaded_modules[distributed_pkgid].LPROC, :cookie) && CTX[].inited[] @warn "DistributedNext has detected that the Distributed stdlib may be in use. Be aware that these libraries are not compatible, you should use either one or the other." return true else @@ -123,7 +123,7 @@ Base.lock(l::Lockable) = lock(l.lock) Base.trylock(l::Lockable) = trylock(l.lock) Base.unlock(l::Lockable) = unlock(l.lock) -next_ref_id() = Threads.atomic_add!(CTX.ref_id, 1) +next_ref_id() = Threads.atomic_add!(CTX[].ref_id, 1) struct RRID whence::Int @@ -146,12 +146,11 @@ include("macros.jl") # @spawn and friends include("workerpool.jl") include("pmap.jl") include("managers.jl") # LocalManager and SSHManager -include("precompile.jl") # Bundles all mutable global state for a distributed cluster into a single -# object. Currently a single global instance (`CTX`) is used, but multiple -# independent clusters could be supported in the future. -@kwdef struct ClusterContext +# object. The active context is accessed via the `CTX` ScopedValue, allowing +# multiple independent clusters to coexist in different task scopes. +@kwdef mutable struct ClusterContext # Process identity lproc::LocalProcess = LocalProcess() role::Ref{Symbol} = Ref{Symbol}(:master) @@ -204,11 +203,42 @@ include("precompile.jl") # Scoped value for exited callback pid exited_callback_pid::ScopedValue{Int} = ScopedValue(-1) + # GC messages task + shutting_down::Threads.Atomic{Bool} = Threads.Atomic{Bool}(false) + gc_msgs_task::Union{Task, Nothing} = nothing + # Stdlib watcher - stdlib_watcher_timer::Ref{Union{Timer, Nothing}} = Ref{Union{Timer, Nothing}}(nothing) + stdlib_watcher_timer::Union{Timer, Nothing} = nothing +end + +function ClusterContext(f::Base.Callable; kwargs...) + ctx = ClusterContext(; kwargs...) + ret = @with CTX => ctx f() + close(ctx) + + return ret end -const CTX = ClusterContext() +function Base.close(ctx::ClusterContext) + ctx.shutting_down[] = true + if !isnothing(ctx.gc_msgs_task) + @lock ctx.any_gc_flag notify(ctx.any_gc_flag) + wait(ctx.gc_msgs_task::Task) + end + + if !isnothing(ctx.stdlib_watcher_timer) + close(ctx.stdlib_watcher_timer::Timer) + end + + # Close all tracked sockets + @lock ctx.map_sock_wrkr for sock in keys(ctx.map_sock_wrkr[]) + close(sock) + end +end + +const CTX = ScopedValue(ClusterContext()) + +include("precompile.jl") function __init__() init_parallel() @@ -219,13 +249,14 @@ function __init__() # cluster cookie has been set, which is most likely to have been done # through Distributed.init_multi() being called by Distributed.addprocs() or # something. - CTX.stdlib_watcher_timer[] = Timer(0; interval=1) do timer + CTX[].stdlib_watcher_timer = Timer(0; interval=1) do timer if _check_distributed_active() close(timer) end end - atexit(() -> close(CTX.stdlib_watcher_timer[])) end + + atexit(() -> close(CTX[])) end end diff --git a/src/cluster.jl b/src/cluster.jl index 11d1a76..56477db 100644 --- a/src/cluster.jl +++ b/src/cluster.jl @@ -10,7 +10,7 @@ Cluster managers implement how workers can be added, removed and communicated wi abstract type ClusterManager end function throw_if_cluster_manager_unassigned() - isassigned(CTX.cluster_manager) || error("cluster_manager is unassigned") + isassigned(CTX[].cluster_manager) || error("cluster_manager is unassigned") return nothing end @@ -144,8 +144,8 @@ mutable struct Worker Worker(id::Int) = Worker(id, nothing) function Worker(id::Int, conn_func) @assert id > 0 - @lock CTX.map_pid_wrkr if haskey(CTX.map_pid_wrkr[], id) - return CTX.map_pid_wrkr[][id] + @lock CTX[].map_pid_wrkr if haskey(CTX[].map_pid_wrkr[], id) + return CTX[].map_pid_wrkr[][id] end w=new(id, Threads.ReentrantLock(), [], [], false, WorkerState_created, Threads.Condition(), time(), conn_func) w.initialized = Event() @@ -173,12 +173,12 @@ end function check_worker_state(w::Worker) if (@atomic w.state) === WorkerState_created if !isclusterlazy() - if CTX.pgrp.topology === :all_to_all + if CTX[].pgrp.topology === :all_to_all # Since higher pids connect with lower pids, the remote worker # may not have connected to us yet. Wait for some time. wait_for_conn(w) else - error("peer $(w.id) is not connected to $(myid()). Topology : " * string(CTX.pgrp.topology)) + error("peer $(w.id) is not connected to $(myid()). Topology : " * string(CTX[].pgrp.topology)) end else w.ct_time = time() @@ -231,6 +231,7 @@ mutable struct LocalProcess bind_port_hint::Int bind_port::Int cookie::String + in_process::Bool # true when running as a task, not a separate OS process LocalProcess() = new(1) end @@ -260,7 +261,7 @@ should not be relied upon to always pick the fastest interface. It does not return. """ start_worker(cookie::AbstractString=readline(stdin); kwargs...) = start_worker(stdout, cookie; kwargs...) -function start_worker(out::IO, cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true) +function start_worker(out::IO, cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true, exit_on_close::Bool=true) init_multi() if close_stdin # workers will not use it @@ -270,20 +271,27 @@ function start_worker(out::IO, cookie::AbstractString=readline(stdin); close_std stderr_to_stdout && redirect_stderr(stdout) init_worker(cookie) - interface = IPv4(CTX.lproc.bind_addr) - if CTX.lproc.bind_port == 0 - (port, sock) = listenany(interface, CTX.lproc.bind_port_hint) - CTX.lproc.bind_port = Int(port) + interface = IPv4(CTX[].lproc.bind_addr) + if CTX[].lproc.bind_port == 0 + (port, sock) = listenany(interface, CTX[].lproc.bind_port_hint) + CTX[].lproc.bind_port = Int(port) else - sock = listen(interface, CTX.lproc.bind_port) + sock = listen(interface, CTX[].lproc.bind_port) end errormonitor(@async while isopen(sock) - client = accept(sock) - process_messages(client, client, true) + try + client = accept(sock) + process_messages(client, client, true) + catch ex + # An IOError is thrown when the socket is closed + if !(ex isa Base.IOError) + rethrow() + end + end end) print(out, "julia_worker:") # print header - print(out, "$(CTX.lproc.bind_port)#") # print port - print(out, CTX.lproc.bind_addr) + print(out, "$(CTX[].lproc.bind_port)#") # print port + print(out, CTX[].lproc.bind_addr) print(out, '\n') flush(out) @@ -298,13 +306,17 @@ function start_worker(out::IO, cookie::AbstractString=readline(stdin); close_std # To prevent hanging processes on remote machines, newly launched workers exit if the # master process does not connect in time. check_master_connect() - while true; wait(); end + while isopen(out) + sleep(0.1) + end catch err print(stderr, "unhandled exception on $(myid()): $(err)\nexiting.\n") end close(sock) - exit(0) + if exit_on_close + exit(0) + end end @@ -404,16 +416,16 @@ function init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClus # On workers, the default cluster manager connects via TCP sockets. Custom # transports will need to call this function with their own manager. - CTX.cluster_manager[] = manager + CTX[].cluster_manager[] = manager # Since our pid has yet to be set, ensure no RemoteChannel / Future have been created or addprocs() called. @assert nprocs() <= 1 - @assert isempty(CTX.pgrp.refs) - @assert isempty(CTX.client_refs) + @assert isempty(CTX[].pgrp.refs) + @assert isempty(CTX[].client_refs) # System is started in head node mode, cleanup related entries - empty!(CTX.pgrp.workers) - @lock CTX.map_pid_wrkr empty!(CTX.map_pid_wrkr[]) + empty!(CTX[].pgrp.workers) + @lock CTX[].map_pid_wrkr empty!(CTX[].map_pid_wrkr[]) cluster_cookie(cookie) nothing @@ -476,14 +488,14 @@ function addprocs(manager::ClusterManager; kwargs...) # Call worker-starting callbacks warning_interval = params[:callback_warning_interval] - _run_callbacks_concurrently("worker-starting", CTX.worker_starting_callbacks, + _run_callbacks_concurrently("worker-starting", CTX[].worker_starting_callbacks, warning_interval, [(manager, params)]) # Add new workers - new_workers = @lock CTX.worker_lock addprocs_locked(manager::ClusterManager, params) + new_workers = @lock CTX[].worker_lock addprocs_locked(manager::ClusterManager, params) # Call worker-started callbacks - _run_callbacks_concurrently("worker-started", CTX.worker_started_callbacks, + _run_callbacks_concurrently("worker-started", CTX[].worker_started_callbacks, warning_interval, new_workers) return new_workers @@ -492,12 +504,12 @@ end function addprocs_locked(manager::ClusterManager, params) topology(Symbol(params[:topology])) - if CTX.pgrp.topology !== :all_to_all + if CTX[].pgrp.topology !== :all_to_all params[:lazy] = false end - if CTX.pgrp.lazy === nothing || nprocs() == 1 - CTX.pgrp.lazy = params[:lazy] + if CTX[].pgrp.lazy === nothing || nprocs() == 1 + CTX[].pgrp.lazy = params[:lazy] elseif isclusterlazy() != params[:lazy] throw(ArgumentError(string("Active workers with lazy=", isclusterlazy(), ". Cannot set lazy=", params[:lazy]))) @@ -527,7 +539,7 @@ function addprocs_locked(manager::ClusterManager, params) if isempty(launched) istaskdone(t_launch) && break @async begin - sleep(1) + sleep(0.1) notify(launch_ntfy) end wait(launch_ntfy) @@ -629,7 +641,7 @@ end function create_worker(manager::ClusterManager, wconfig::WorkerConfig) # only node 1 can add new nodes, since nobody else has the full list of address:port - @assert CTX.lproc.id == 1 + @assert CTX[].lproc.id == 1 timeout = worker_timeout() # initiate a connect. Does not wait for connection completion in case of TCP. @@ -678,11 +690,11 @@ function create_worker(manager::ClusterManager, wconfig::WorkerConfig) # - On master, receiving a JoinCompleteMsg triggers rr_ntfy_join (signifies that worker setup is complete) join_list = [] - if CTX.pgrp.topology === :all_to_all + if CTX[].pgrp.topology === :all_to_all # need to wait for lower worker pids to have completed connecting, since the numerical value # of pids is relevant to the connection process, i.e., higher pids connect to lower pids and they # require the value of config.connect_at which is set only upon connection completion - for jw in CTX.pgrp.workers + for jw in CTX[].pgrp.workers if (jw.id != 1) && (jw.id < w.id) lock(jw.c_state) do # wait for wl to join @@ -694,12 +706,12 @@ function create_worker(manager::ClusterManager, wconfig::WorkerConfig) end end - elseif CTX.pgrp.topology === :custom + elseif CTX[].pgrp.topology === :custom # wait for requested workers to be up before connecting to them. filterfunc(x) = (x.id != 1) && isdefined(x, :config) && (notnothing(x.config.ident) in something(wconfig.connect_idents, [])) - wlist = filter(filterfunc, CTX.pgrp.workers) + wlist = filter(filterfunc, CTX[].pgrp.workers) waittime = 0 while wconfig.connect_idents !== nothing && length(wlist) < length(wconfig.connect_idents) @@ -708,7 +720,7 @@ function create_worker(manager::ClusterManager, wconfig::WorkerConfig) end sleep(1.0) waittime += 1 - wlist = filter(filterfunc, CTX.pgrp.workers) + wlist = filter(filterfunc, CTX[].pgrp.workers) end for wl in wlist @@ -728,7 +740,7 @@ function create_worker(manager::ClusterManager, wconfig::WorkerConfig) join_list) send_connection_hdr(w, true) enable_threaded_blas = something(wconfig.enable_threaded_blas, false) - join_message = JoinPGRPMsg(w.id, all_locs, CTX.pgrp.topology, enable_threaded_blas, isclusterlazy()) + join_message = JoinPGRPMsg(w.id, all_locs, CTX[].pgrp.topology, enable_threaded_blas, isclusterlazy()) send_msg_now(w, MsgHeader(RRID(0,0), ntfy_oid), join_message) errormonitor(@async manage(w.manager, w.id, w.config, :register)) @@ -737,8 +749,8 @@ function create_worker(manager::ClusterManager, wconfig::WorkerConfig) if timedwait(() -> isready(rr_ntfy_join), timeout) === :timed_out error("worker did not connect within $timeout seconds") end - lock(CTX.client_refs) do - delete!(CTX.pgrp.refs, ntfy_oid) + lock(CTX[].client_refs) do + delete!(CTX[].pgrp.refs, ntfy_oid) end return w.id @@ -806,7 +818,7 @@ function check_master_connect() errormonitor( Threads.@spawn begin timeout = worker_timeout() - if timedwait(() -> @lock(CTX.map_pid_wrkr, haskey(CTX.map_pid_wrkr[], 1)), timeout) === :timed_out + if timedwait(() -> @lock(CTX[].map_pid_wrkr, haskey(CTX[].map_pid_wrkr[], 1)), timeout) === :timed_out print(stderr, "Master process (id 1) could not connect within $(timeout) seconds.\nexiting.\n") exit(1) end @@ -820,7 +832,7 @@ end Return the cluster cookie. """ -cluster_cookie() = (init_multi(); CTX.lproc.cookie) +cluster_cookie() = (init_multi(); CTX[].lproc.cookie) """ cluster_cookie(cookie) -> cookie @@ -835,12 +847,12 @@ function cluster_cookie(cookie) cookie = rpad(cookie, HDR_COOKIE_LEN) - CTX.lproc.cookie = cookie + CTX[].lproc.cookie = cookie cookie end # Note that atomic_add!() returns the old value, which is what we want -get_next_pid() = Threads.atomic_add!(CTX.next_pid, 1) +get_next_pid() = Threads.atomic_add!(CTX[].next_pid, 1) mutable struct ProcessGroup name::String @@ -854,18 +866,18 @@ end function topology(t) @assert t in [:all_to_all, :master_worker, :custom] - if (CTX.pgrp.topology==t) || ((myid()==1) && (nprocs()==1)) || (myid() > 1) - CTX.pgrp.topology = t + if (CTX[].pgrp.topology==t) || ((myid()==1) && (nprocs()==1)) || (myid() > 1) + CTX[].pgrp.topology = t else - error("Workers with Topology $(CTX.pgrp.topology) already exist. Requested Topology $(t) cannot be set.") + error("Workers with Topology $(CTX[].pgrp.topology) already exist. Requested Topology $(t) cannot be set.") end t end -isclusterlazy() = something(CTX.pgrp.lazy, false) +isclusterlazy() = something(CTX[].pgrp.lazy, false) get_bind_addr(pid::Integer) = get_bind_addr(worker_from_id(pid)) -get_bind_addr(w::LocalProcess) = CTX.lproc.bind_addr +get_bind_addr(w::LocalProcess) = CTX[].lproc.bind_addr function get_bind_addr(w::Worker) if w.config.bind_addr === nothing if w.id != myid() @@ -879,9 +891,9 @@ const HDR_VERSION_LEN = 16 const HDR_COOKIE_LEN = 16 # whether process is a master or worker in a distributed setup -myrole() = CTX.role[] +myrole() = CTX[].role[] function myrole!(proctype::Symbol) - CTX.role[] = proctype + CTX[].role[] = proctype end # Callbacks @@ -960,14 +972,14 @@ try to either keep the callbacks fast to execute, or do the actual work asynchronously by spawning a task in the callback (beware of race conditions if you do this). """ -add_worker_starting_callback(f::Base.Callable; key=nothing) = _add_callback(f, key, CTX.worker_starting_callbacks; +add_worker_starting_callback(f::Base.Callable; key=nothing) = _add_callback(f, key, CTX[].worker_starting_callbacks; arg_types=Tuple{ClusterManager, Dict}) """ remove_worker_starting_callback(key) Remove the callback for `key` that was added with [`add_worker_starting_callback()`](@ref). """ -remove_worker_starting_callback(key) = _remove_callback(key, CTX.worker_starting_callbacks) +remove_worker_starting_callback(key) = _remove_callback(key, CTX[].worker_starting_callbacks) """ add_worker_started_callback(f::Base.Callable; key=nothing) -> key @@ -985,14 +997,14 @@ try to either keep the callbacks fast to execute, or do the actual initialization asynchronously by spawning a task in the callback (beware of race conditions if you do this). """ -add_worker_started_callback(f::Base.Callable; key=nothing) = _add_callback(f, key, CTX.worker_started_callbacks) +add_worker_started_callback(f::Base.Callable; key=nothing) = _add_callback(f, key, CTX[].worker_started_callbacks) """ remove_worker_started_callback(key) Remove the callback for `key` that was added with [`add_worker_started_callback()`](@ref). """ -remove_worker_started_callback(key) = _remove_callback(key, CTX.worker_started_callbacks) +remove_worker_started_callback(key) = _remove_callback(key, CTX[].worker_started_callbacks) """ add_worker_exiting_callback(f::Base.Callable; key=nothing) -> key @@ -1006,14 +1018,14 @@ All worker-exiting callbacks will be executed concurrently and if they don't all finish before the `callback_timeout` passed to `rmprocs()` then the worker will be removed anyway. """ -add_worker_exiting_callback(f::Base.Callable; key=nothing) = _add_callback(f, key, CTX.worker_exiting_callbacks) +add_worker_exiting_callback(f::Base.Callable; key=nothing) = _add_callback(f, key, CTX[].worker_exiting_callbacks) """ remove_worker_exiting_callback(key) Remove the callback for `key` that was added with [`add_worker_exiting_callback()`](@ref). """ -remove_worker_exiting_callback(key) = _remove_callback(key, CTX.worker_exiting_callbacks) +remove_worker_exiting_callback(key) = _remove_callback(key, CTX[].worker_exiting_callbacks) """ add_worker_exited_callback(f::Base.Callable; key=nothing) -> key @@ -1031,7 +1043,7 @@ of `WorkerState_exterminated` means the worker died unexpectedly. All worker-exited callbacks will be executed concurrently. If a callback throws an exception it will be caught and printed. """ -add_worker_exited_callback(f::Base.Callable; key=nothing) = _add_callback(f, key, CTX.worker_exited_callbacks; +add_worker_exited_callback(f::Base.Callable; key=nothing) = _add_callback(f, key, CTX[].worker_exited_callbacks; arg_types=Tuple{Int, WorkerState}) """ @@ -1039,7 +1051,7 @@ add_worker_exited_callback(f::Base.Callable; key=nothing) = _add_callback(f, key Remove the callback for `key` that was added with [`add_worker_exited_callback()`](@ref). """ -remove_worker_exited_callback(key) = _remove_callback(key, CTX.worker_exited_callbacks) +remove_worker_exited_callback(key) = _remove_callback(key, CTX[].worker_exited_callbacks) # cluster management related API """ @@ -1056,7 +1068,7 @@ julia> remotecall_fetch(() -> myid(), 4) 4 ``` """ -myid() = CTX.lproc.id +myid() = CTX[].lproc.id """ nprocs() @@ -1075,17 +1087,18 @@ julia> workers() ``` """ function nprocs() - if myid() == 1 || (CTX.pgrp.topology === :all_to_all && !isclusterlazy()) - n = length(CTX.pgrp.workers) + ctx = CTX[] + if myid() == 1 || (ctx.pgrp.topology === :all_to_all && !isclusterlazy()) + n = length(ctx.pgrp.workers) # filter out workers in the process of being setup/shutdown. - for jw in CTX.pgrp.workers + for jw in ctx.pgrp.workers if !isa(jw, LocalProcess) && ((@atomic jw.state) !== WorkerState_connected) n = n - 1 end end return n else - return length(CTX.pgrp.workers) + return length(ctx.pgrp.workers) end end @@ -1130,11 +1143,12 @@ julia> procs() ``` """ function procs() - if myid() == 1 || (CTX.pgrp.topology === :all_to_all && !isclusterlazy()) + ctx = CTX[] + if myid() == 1 || (ctx.pgrp.topology === :all_to_all && !isclusterlazy()) # filter out workers in the process of being setup/shutdown. - return Int[x.id for x in CTX.pgrp.workers if isa(x, LocalProcess) || ((@atomic x.state) === WorkerState_connected)] + return Int[x.id for x in ctx.pgrp.workers if isa(x, LocalProcess) || ((@atomic x.state) === WorkerState_connected)] else - return Int[x.id for x in CTX.pgrp.workers] + return Int[x.id for x in ctx.pgrp.workers] end end @@ -1147,14 +1161,15 @@ current worker is filtered out. other_procs() = filter(!=(myid()), procs()) function id_in_procs(id) # faster version of `id in procs()` - if myid() == 1 || (CTX.pgrp.topology === :all_to_all && !isclusterlazy()) - for x in CTX.pgrp.workers + ctx = CTX[] + if myid() == 1 || (ctx.pgrp.topology === :all_to_all && !isclusterlazy()) + for x in ctx.pgrp.workers if (x.id::Int) == id && (isa(x, LocalProcess) || (@atomic (x::Worker).state) === WorkerState_connected) return true end end else - for x in CTX.pgrp.workers + for x in ctx.pgrp.workers if (x.id::Int) == id return true end @@ -1172,9 +1187,10 @@ Specifically all workers bound to the same ip-address as `pid` are returned. See also [`other_procs()`](@ref). """ function procs(pid::Integer) + ctx = CTX[] if myid() == 1 - all_workers = [x for x in CTX.pgrp.workers if isa(x, LocalProcess) || ((@atomic x.state) === WorkerState_connected)] - if (pid == 1) || (isa(@lock(CTX.map_pid_wrkr, CTX.map_pid_wrkr[][pid].manager), LocalManager)) + all_workers = [x for x in ctx.pgrp.workers if isa(x, LocalProcess) || ((@atomic x.state) === WorkerState_connected)] + if (pid == 1) || (isa(@lock(ctx.map_pid_wrkr, ctx.map_pid_wrkr[][pid].manager), LocalManager)) Int[x.id for x in filter(w -> (w.id==1) || (isa(w.manager, LocalManager)), all_workers)] else ipatpid = get_bind_addr(pid) @@ -1270,8 +1286,8 @@ function setstatus!(x, mod::Module, pid::Int=myid()) end if myid() == 1 - @lock CTX.map_pid_statuses begin - statuses = get!(CTX.map_pid_statuses[], pid, Dict{Module, Any}()) + @lock CTX[].map_pid_statuses begin + statuses = get!(CTX[].map_pid_statuses[], pid, Dict{Module, Any}()) statuses[mod] = x end else @@ -1280,8 +1296,8 @@ function setstatus!(x, mod::Module, pid::Int=myid()) end function _getstatus(pid, mod) - @lock CTX.map_pid_statuses begin - statuses = get(CTX.map_pid_statuses[], pid, nothing) + @lock CTX[].map_pid_statuses begin + statuses = get(CTX[].map_pid_statuses[], pid, nothing) isnothing(statuses) ? nothing : get(statuses, mod, nothing) end end @@ -1315,7 +1331,7 @@ function getstatus(mod::Module, pid::Int=myid()) # During the worker-exited callbacks this function may be called, at which # point it will not exist in procs(). Thus we check whether the function is # being called for an exited worker and allow it if so. - if !id_in_procs(pid) && CTX.exited_callback_pid[] != pid + if !id_in_procs(pid) && CTX[].exited_callback_pid[] != pid throw(ArgumentError("Worker $(pid) does not exist, cannot get its status")) end @@ -1383,12 +1399,12 @@ function rmprocs(pids...; waitfor=typemax(Int), callback_timeout=10) end function _rmprocs(pids, waitfor, callback_timeout) - lock(CTX.worker_lock) + lock(CTX[].worker_lock) try # Run the callbacks callback_tasks = Tuple{Any, Task}[] for pid in pids - for (name, callback) in CTX.worker_exiting_callbacks + for (name, callback) in CTX[].worker_exiting_callbacks push!(callback_tasks, (name, Threads.@spawn callback(pid))) end end @@ -1404,7 +1420,7 @@ function _rmprocs(pids, waitfor, callback_timeout) if p == 1 @warn "rmprocs: process 1 not removed" else - w = @lock CTX.map_pid_wrkr get(CTX.map_pid_wrkr[], p, nothing) + w = @lock CTX[].map_pid_wrkr get(CTX[].map_pid_wrkr[], p, nothing) if !isnothing(w) set_worker_state(w, WorkerState_terminating) kill(w.manager, p, w.config) @@ -1425,7 +1441,7 @@ function _rmprocs(pids, waitfor, callback_timeout) throw(ErrorException(estr)) end finally - unlock(CTX.worker_lock) + unlock(CTX[].worker_lock) end end @@ -1443,19 +1459,19 @@ end # No-arg constructor added for compatibility with Julia 1.0 & 1.1, should be deprecated in the future ProcessExitedException() = ProcessExitedException(-1) -worker_from_id(i) = worker_from_id(CTX.pgrp, i) +worker_from_id(i) = worker_from_id(CTX[].pgrp, i) function worker_from_id(pg::ProcessGroup, i) - @lock CTX.map_del_wrkr if !isempty(CTX.map_del_wrkr[]) && in(i, CTX.map_del_wrkr[]) + @lock CTX[].map_del_wrkr if !isempty(CTX[].map_del_wrkr[]) && in(i, CTX[].map_del_wrkr[]) throw(ProcessExitedException(i)) end - w = @lock CTX.map_pid_wrkr get(CTX.map_pid_wrkr[], i, nothing) + w = @lock CTX[].map_pid_wrkr get(CTX[].map_pid_wrkr[], i, nothing) if w === nothing if myid() == 1 error("no process with id $i exists") end w = Worker(i) - @lock CTX.map_pid_wrkr CTX.map_pid_wrkr[][i] = w + @lock CTX[].map_pid_wrkr CTX[].map_pid_wrkr[][i] = w else w = w::Union{Worker, LocalProcess} end @@ -1471,7 +1487,7 @@ This is useful when writing custom [`serialize`](@ref) methods for a type, which optimizes the data written out depending on the receiving process id. """ function worker_id_from_socket(s) - w = @lock CTX.map_sock_wrkr get(CTX.map_sock_wrkr[], s, nothing) + w = @lock CTX[].map_sock_wrkr get(CTX[].map_sock_wrkr[], s, nothing) if isa(w,Worker) if s === w.r_stream || s === w.w_stream return w.id @@ -1485,30 +1501,30 @@ function worker_id_from_socket(s) end -register_worker(w) = register_worker(CTX.pgrp, w) +register_worker(w) = register_worker(CTX[].pgrp, w) function register_worker(pg, w) push!(pg.workers, w) - @lock CTX.map_pid_wrkr CTX.map_pid_wrkr[][w.id] = w + @lock CTX[].map_pid_wrkr CTX[].map_pid_wrkr[][w.id] = w end function register_worker_streams(w) - @lock CTX.map_sock_wrkr begin - CTX.map_sock_wrkr[][w.r_stream] = w - CTX.map_sock_wrkr[][w.w_stream] = w + @lock CTX[].map_sock_wrkr begin + CTX[].map_sock_wrkr[][w.r_stream] = w + CTX[].map_sock_wrkr[][w.w_stream] = w end end -deregister_worker(pid) = deregister_worker(CTX.pgrp, pid) +deregister_worker(pid) = deregister_worker(CTX[].pgrp, pid) function deregister_worker(pg, pid) pg.workers = filter(x -> !(x.id == pid), pg.workers) - w = @lock CTX.map_pid_wrkr pop!(CTX.map_pid_wrkr[], pid, nothing) + w = @lock CTX[].map_pid_wrkr pop!(CTX[].map_pid_wrkr[], pid, nothing) if isa(w, Worker) if isdefined(w, :r_stream) - @lock CTX.map_sock_wrkr begin - pop!(CTX.map_sock_wrkr[], w.r_stream, nothing) + @lock CTX[].map_sock_wrkr begin + pop!(CTX[].map_sock_wrkr[], w.r_stream, nothing) if w.r_stream != w.w_stream - pop!(CTX.map_sock_wrkr[], w.w_stream, nothing) + pop!(CTX[].map_sock_wrkr[], w.w_stream, nothing) end end end @@ -1516,7 +1532,7 @@ function deregister_worker(pg, pid) if myid() == 1 && (myrole() === :master) && isdefined(w, :config) # Notify the cluster manager of this workers death manage(w.manager, w.id, w.config, :deregister) - if CTX.pgrp.topology !== :all_to_all || isclusterlazy() + if CTX[].pgrp.topology !== :all_to_all || isclusterlazy() for rpid in other_workers() try remote_do(deregister_worker, rpid, pid) @@ -1526,12 +1542,12 @@ function deregister_worker(pg, pid) end end end - @lock CTX.map_del_wrkr push!(CTX.map_del_wrkr[], pid) + @lock CTX[].map_del_wrkr push!(CTX[].map_del_wrkr[], pid) # delete this worker from our remote reference client sets ids = [] tonotify = [] - lock(CTX.client_refs) do + lock(CTX[].client_refs) do for (id, rv) in pg.refs if in(pid, rv.clientset) push!(ids, id) @@ -1560,13 +1576,13 @@ function deregister_worker(pg, pid) # pid check. We go to some effort to make sure this works after # deregistering the worker because if it's called beforehand the worker # will incorrectly be shown in e.g. procs(). - @with CTX.exited_callback_pid => pid begin - _run_callbacks_concurrently("worker-exited", CTX.worker_exited_callbacks, + @with CTX[].exited_callback_pid => pid begin + _run_callbacks_concurrently("worker-exited", CTX[].worker_exited_callbacks, warning_interval, [(pid, w.state)]; catch_exceptions=true) end # Delete its statuses - @lock CTX.map_pid_statuses delete!(CTX.map_pid_statuses[], pid) + @lock CTX[].map_pid_statuses delete!(CTX[].map_pid_statuses[], pid) end return @@ -1575,7 +1591,7 @@ end function interrupt(pid::Integer) @assert myid() == 1 - w = @lock CTX.map_pid_wrkr CTX.map_pid_wrkr[][pid] + w = @lock CTX[].map_pid_wrkr CTX[].map_pid_wrkr[][pid] if isa(w, Worker) manage(w.manager, w.id, w.config, :interrupt) end @@ -1615,11 +1631,11 @@ function check_same_host(pids) # We checkfirst if all test pids have been started using the local manager, # else we check for the same bind_to addr. This handles the special case # where the local ip address may change - as during a system sleep/awake - @lock CTX.map_pid_wrkr if all(p -> (p==1) || (isa(CTX.map_pid_wrkr[][p].manager, LocalManager)), pids) + @lock CTX[].map_pid_wrkr if all(p -> (p==1) || (isa(CTX[].map_pid_wrkr[][p].manager, LocalManager)), pids) return true else - first_bind_addr = notnothing(wp_bind_addr(CTX.map_pid_wrkr[][pids[1]])) - return all(p -> notnothing(wp_bind_addr(CTX.map_pid_wrkr[][p])) == first_bind_addr, pids[2:end]) + first_bind_addr = notnothing(wp_bind_addr(CTX[].map_pid_wrkr[][pids[1]])) + return all(p -> notnothing(wp_bind_addr(CTX[].map_pid_wrkr[][p])) == first_bind_addr, pids[2:end]) end end end @@ -1687,16 +1703,16 @@ function init_bind_addr() end end - CTX.lproc.bind_addr = bind_addr - CTX.lproc.bind_port = bind_port - CTX.lproc.bind_port_hint = bind_port_hint + CTX[].lproc.bind_addr = bind_addr + CTX[].lproc.bind_port = bind_port + CTX[].lproc.bind_port_hint = bind_port_hint end using Random: randstring # do initialization that's only needed when there is more than 1 processor function init_multi() - if !Threads.atomic_cas!(CTX.inited, false, true) + if !Threads.atomic_cas!(CTX[].inited, false, true) push!(Base.package_callbacks, _require_callback) atexit(terminate_all_workers) init_bind_addr() @@ -1706,12 +1722,12 @@ function init_multi() end function init_parallel() - start_gc_msgs_task() + CTX[].gc_msgs_task = start_gc_msgs_task() # start in "head node" mode, if worker, will override later. - CTX.lproc.id = 1 - @assert isempty(CTX.pgrp.workers) - register_worker(CTX.lproc) + CTX[].lproc.id = 1 + @assert isempty(CTX[].pgrp.workers) + register_worker(CTX[].lproc) end write_cookie(io::IO) = print(io.in, string(cluster_cookie(), "\n")) diff --git a/src/clusterserialize.jl b/src/clusterserialize.jl index 9f96abe..29496dd 100644 --- a/src/clusterserialize.jl +++ b/src/clusterserialize.jl @@ -28,26 +28,28 @@ end ClusterSerializer(io::IO) = ClusterSerializer{typeof(io)}(io) function object_number(s::ClusterSerializer, @nospecialize(l)) - if haskey(CTX.object_numbers, l) - return CTX.object_numbers[l] + ctx = CTX[] + if haskey(ctx.object_numbers, l) + return ctx.object_numbers[l] end # a hash function that always gives the same number to the same # object on the same machine, and is unique over all machines. - ln = CTX.obj_number_salt[]+(UInt64(myid())<<44) - CTX.obj_number_salt[] += 1 - CTX.object_numbers[l] = ln + ln = ctx.obj_number_salt[]+(UInt64(myid())<<44) + ctx.obj_number_salt[] += 1 + ctx.object_numbers[l] = ln return ln::UInt64 end function lookup_object_number(s::ClusterSerializer, n::UInt64) - return get(CTX.known_object_data, n, nothing) + return get(CTX[].known_object_data, n, nothing) end function remember_object(s::ClusterSerializer, @nospecialize(o), n::UInt64) - CTX.known_object_data[n] = o - if isa(o, Core.TypeName) && !haskey(CTX.object_numbers, o) + ctx = CTX[] + ctx.known_object_data[n] = o + if isa(o, Core.TypeName) && !haskey(ctx.object_numbers, o) # set up reverse mapping for serialize - CTX.object_numbers[o] = n + ctx.object_numbers[o] = n end return nothing end diff --git a/src/macros.jl b/src/macros.jl index 30f87a8..a4dc30a 100644 --- a/src/macros.jl +++ b/src/macros.jl @@ -1,7 +1,7 @@ # This file is a part of Julia. License is MIT: https://julialang.org/license function nextproc() - idx = Threads.atomic_add!(CTX.next_worker_idx, 1) + idx = Threads.atomic_add!(CTX[].next_worker_idx, 1) return workers()[(idx % nworkers()) + 1] end diff --git a/src/managers.jl b/src/managers.jl index ce9f917..a65ebab 100644 --- a/src/managers.jl +++ b/src/managers.jl @@ -415,7 +415,7 @@ function manage(manager::SSHManager, id::Integer, config::WorkerConfig, op::Symb end # This is defined such that the port numbers start at 9201 and wrap around at 32,000 -next_tunnel_port() = (Threads.atomic_add!(CTX.tunnel_counter, 1) % 22_800) + 9200 +next_tunnel_port() = (Threads.atomic_add!(CTX[].tunnel_counter, 1) % 22_800) + 9200 """ @@ -458,8 +458,10 @@ end # LocalManager struct LocalManager <: ClusterManager np::Int - restrict::Bool # Restrict binding to 127.0.0.1 only + restrict::Bool # Restrict binding to 127.0.0.1 only + in_process::Bool # Run workers as tasks in the same process (internal) end +LocalManager(np, restrict) = LocalManager(np, restrict, false) """ addprocs(np::Integer=Sys.CPU_THREADS; restrict=true, kwargs...) -> List of process identifiers @@ -495,7 +497,7 @@ function launch(manager::LocalManager, params::Dict, launched::Array, c::Conditi dir = params[:dir] exename = params[:exename] exeflags = params[:exeflags] - bind_to = manager.restrict ? `127.0.0.1` : `$(CTX.lproc.bind_addr)` + bind_to = manager.restrict ? `127.0.0.1` : `$(CTX[].lproc.bind_addr)` env = Dict{String,String}(params[:env]) # TODO: Maybe this belongs in base/initdefs.jl as a package_environment() function @@ -527,13 +529,30 @@ function launch(manager::LocalManager, params::Dict, launched::Array, c::Conditi end for i in 1:manager.np - cmd = `$(julia_cmd(exename)) $exeflags --bind-to $bind_to $(get_worker_arg())` - io = open(detach(setenv(addenv(cmd, env), dir=dir)), "r+") - write_cookie(io) - wconfig = WorkerConfig() - wconfig.process = io - wconfig.io = io.out + + if manager.in_process + cookie = cluster_cookie() + worker_ctx = ClusterContext() + worker_ctx.lproc.in_process = true + pipe = Pipe() + Base.link_pipe!(pipe; reader_supports_async=true, writer_supports_async=true) + + task = Threads.@spawn @with CTX => worker_ctx begin + start_worker(pipe.in, cookie; close_stdin=false, stderr_to_stdout=false, exit_on_close=false) + end + + wconfig.io = pipe.out + wconfig.userdata = (; ctx=worker_ctx, task, pipe) + else + cmd = `$(julia_cmd(exename)) $exeflags --bind-to $bind_to $(get_worker_arg())` + proc = open(detach(setenv(addenv(cmd, env), dir=dir)), "r+") + + write_cookie(proc) + wconfig.io = proc.out + wconfig.process = proc + end + wconfig.enable_threaded_blas = params[:enable_threaded_blas] push!(launched, wconfig) end @@ -542,7 +561,7 @@ function launch(manager::LocalManager, params::Dict, launched::Array, c::Conditi end function manage(manager::LocalManager, id::Integer, config::WorkerConfig, op::Symbol) - if op === :interrupt + if op === :interrupt && !manager.in_process kill(config.process::Process, 2) end end @@ -622,10 +641,10 @@ function connect(manager::ClusterManager, pid::Int, config::WorkerConfig) end if tunnel - if !haskey(CTX.tunnel_hosts_map, pubhost) - CTX.tunnel_hosts_map[pubhost] = Semaphore(something(config.max_parallel, typemax(Int))) + if !haskey(CTX[].tunnel_hosts_map, pubhost) + CTX[].tunnel_hosts_map[pubhost] = Semaphore(something(config.max_parallel, typemax(Int))) end - sem = CTX.tunnel_hosts_map[pubhost] + sem = CTX[].tunnel_hosts_map[pubhost] sshflags = notnothing(config.sshflags) multiplex = something(config.multiplex, false) @@ -690,9 +709,9 @@ end function bind_client_port(sock::TCPSocket, iptype) bind_host = iptype(0) - if Sockets.bind(sock, bind_host, CTX.client_port[]) + if Sockets.bind(sock, bind_host, CTX[].client_port[]) _addr, port = getsockname(sock) - CTX.client_port[] = port + CTX[].client_port[] = port end return sock end @@ -756,16 +775,24 @@ function kill(manager::SSHManager, pid::Int, config::WorkerConfig) end function kill(manager::LocalManager, pid::Int, config::WorkerConfig; profile_wait = 6, exit_timeout = 15, term_timeout = 15) + if manager.in_process + (; ctx, task, pipe) = config.userdata + # Close the pipe, which causes the start_worker() task to exit + close(pipe) + wait(task) + close(ctx) + return nothing + end + # profile_wait = 6 is 1s for profile, 5s for the report to show # First, try sending `exit()` to the remote over the usual control channels remote_do(exit, pid) timer_task = @async begin - sleep(exit_timeout) + process = config.process::Process # Check to see if our child exited, and if not, send an actual kill signal - process = config.process::Process - if !process_exited(process) + if timedwait(() -> process_exited(process), exit_timeout) === :timed_out @warn "Failed to gracefully kill worker $(pid)" profile_sig = Sys.iswindows() ? nothing : Sys.isbsd() ? ("SIGINFO", 29) : ("SIGUSR1" , 10) if profile_sig !== nothing diff --git a/src/messages.jl b/src/messages.jl index dcd473d..92c53df 100644 --- a/src/messages.jl +++ b/src/messages.jl @@ -193,7 +193,8 @@ end function flush_gc_msgs() try - for w in (CTX.pgrp::ProcessGroup).workers + ctx = CTX[] + for w in (ctx.pgrp::ProcessGroup).workers if isa(w,Worker) && ((@atomic w.state) == WorkerState_connected) && w.gcflag flush_gc_msgs(w) end @@ -209,7 +210,7 @@ function send_connection_hdr(w::Worker, cookie=true) # else when we initiate a connection we first send the cookie followed by our version. # The remote side validates the cookie. if cookie - write(w.w_stream, CTX.lproc.cookie) + write(w.w_stream, CTX[].lproc.cookie) end write(w.w_stream, rpad(VERSION_STRING, HDR_VERSION_LEN)[1:HDR_VERSION_LEN]) end diff --git a/src/precompile.jl b/src/precompile.jl index e3d592b..bd6dd13 100644 --- a/src/precompile.jl +++ b/src/precompile.jl @@ -1,40 +1,10 @@ -precompile(Tuple{typeof(DistributedNext.remotecall),Function,Int,Module,Vararg{Any, 100}}) -precompile(Tuple{typeof(DistributedNext.procs)}) -precompile(Tuple{typeof(DistributedNext.finalize_ref), DistributedNext.Future}) -precompile(Tuple{typeof(DistributedNext.setup_launched_worker), DistributedNext.LocalManager, DistributedNext.WorkerConfig, Vector{Int}}) -precompile(Tuple{typeof(DistributedNext.process_tcp_streams), Sockets.TCPSocket, Sockets.TCPSocket, Bool}) -precompile(Tuple{typeof(Serialization.serialize), DistributedNext.ClusterSerializer{Sockets.TCPSocket}, Array{Any, 1}}) -precompile(Tuple{typeof(DistributedNext.parse_connection_info), String}) -precompile(Tuple{typeof(DistributedNext._run_callbacks_concurrently), String, Base.Dict{Any, Union{Function, Type}}, Int64, Array{Tuple{DistributedNext.LocalManager, Base.Dict{Symbol, Any}}, 1}}) -precompile(Tuple{typeof(DistributedNext._run_callbacks_concurrently), String, Base.Dict{Any, Union{Function, Type}}, Int64, Array{Int64, 1}}) -precompile(Tuple{typeof(DistributedNext.read_worker_host_port), Base.PipeEndpoint}) -precompile(Tuple{typeof(Base.put!), Base.Channel{Any}, Int64}) -precompile(Tuple{typeof(Base.put!), Base.Channel{Any}, WeakRef}) +using PrecompileTools: @compile_workload -precompile(Tuple{typeof(DistributedNext.handle_msg), DistributedNext.IdentifySocketMsg, DistributedNext.MsgHeader, Sockets.TCPSocket, Sockets.TCPSocket, VersionNumber}) -precompile(Tuple{typeof(DistributedNext.handle_msg), DistributedNext.CallWaitMsg, DistributedNext.MsgHeader, Sockets.TCPSocket, Sockets.TCPSocket, VersionNumber}) -precompile(Tuple{typeof(DistributedNext.handle_msg), DistributedNext.CallMsg{:call}, DistributedNext.MsgHeader, Sockets.TCPSocket, Sockets.TCPSocket, VersionNumber}) -precompile(Tuple{typeof(DistributedNext.handle_msg), DistributedNext.CallMsg{:call_fetch}, DistributedNext.MsgHeader, Sockets.TCPSocket, Sockets.TCPSocket, VersionNumber}) -precompile(Tuple{typeof(DistributedNext.handle_msg), DistributedNext.ResultMsg, DistributedNext.MsgHeader, Sockets.TCPSocket, Sockets.TCPSocket, VersionNumber}) -precompile(Tuple{typeof(DistributedNext.handle_msg), DistributedNext.IdentifySocketAckMsg, DistributedNext.MsgHeader, Sockets.TCPSocket, Sockets.TCPSocket, VersionNumber}) -precompile(Tuple{typeof(DistributedNext.handle_msg), DistributedNext.RemoteDoMsg, DistributedNext.MsgHeader, Sockets.TCPSocket, Sockets.TCPSocket, VersionNumber}) -precompile(Tuple{typeof(DistributedNext.handle_msg), DistributedNext.JoinPGRPMsg, DistributedNext.MsgHeader, Sockets.TCPSocket, Sockets.TCPSocket, VersionNumber}) -precompile(Tuple{typeof(DistributedNext.handle_msg), DistributedNext.JoinCompleteMsg, DistributedNext.MsgHeader, Sockets.TCPSocket, Sockets.TCPSocket, VersionNumber}) - -# These lines were obtained from `addprocs(1; exeflags=`--trace-compile=compilation.txt --trace-compile-timing`)` -precompile(Tuple{typeof(DistributedNext.start_worker)}) -precompile(Tuple{typeof(DistributedNext.start_worker), Base.PipeEndpoint, String}) -precompile(Tuple{typeof(DistributedNext.set_valid_processes), Array{Int64, 1}}) -precompile(Tuple{typeof(DistributedNext.terminate_all_workers)}) - -# This is disabled because it doesn't give much benefit -# and the code in Distributed is poorly typed causing many invalidations -# TODO: Maybe reenable now that Distributed is not in sysimage. -#= - precompile_script *= """ - using Distributed - addprocs(2) - pmap(x->iseven(x) ? 1 : 0, 1:4) - @distributed (+) for i = 1:100 Int(rand(Bool)) end - """ -=# +@compile_workload begin + # Run the workload in a separate ClusterContext so the default one stays clean + ClusterContext() do + # Use an in-process worker to avoid spawning a real process during precompilation + pid = only(addprocs(LocalManager(1, true, true))) + rmprocs(pid) + end +end diff --git a/src/process_messages.jl b/src/process_messages.jl index 094caab..22f3c4d 100644 --- a/src/process_messages.jl +++ b/src/process_messages.jl @@ -81,9 +81,10 @@ function run_work_thunk_remotevalue(rv::RemoteValue, thunk) end function schedule_call(rid, thunk) - return lock(CTX.client_refs) do + ctx = CTX[] + @lock ctx.client_refs begin rv = RemoteValue(def_rv_channel()) - (CTX.pgrp::ProcessGroup).refs[rid] = rv + (ctx.pgrp::ProcessGroup).refs[rid] = rv push!(rv.clientset, rid.whence) errormonitor(@async run_work_thunk_remotevalue(rv, thunk)) return rv @@ -210,6 +211,7 @@ function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool) handle_msg(msg, header, r_stream, w_stream, version) end catch e + ctx = CTX[] oldstate = WorkerState_unknown # Check again as it may have been set in a message handler but not propagated to the calling block above @@ -220,13 +222,13 @@ function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool) if wpid < 1 println(stderr, e, CapturedException(e, catch_backtrace())) println(stderr, "Process($(myid())) - Unknown remote, closing connection.") - elseif @lock(CTX.map_del_wrkr, !(wpid in CTX.map_del_wrkr[])) + elseif @lock(ctx.map_del_wrkr, !(wpid in ctx.map_del_wrkr[])) werr = worker_from_id(wpid) oldstate = @atomic werr.state set_worker_state(werr, oldstate != WorkerState_terminating ? WorkerState_exterminated : WorkerState_terminated) # If unhandleable error occurred talking to pid 1, exit - if wpid == 1 + if wpid == 1 && !ctx.lproc.in_process if isopen(w_stream) @error "Fatal error on process $(myid())" exception=e,catch_backtrace() end @@ -318,24 +320,26 @@ function handle_msg(msg::IdentifySocketMsg, header, r_stream, w_stream, version) throw_if_cluster_manager_unassigned() # register a new peer worker connection - w = Worker(msg.from_pid, r_stream, w_stream, CTX.cluster_manager[]; version=version)::Worker + w = Worker(msg.from_pid, r_stream, w_stream, CTX[].cluster_manager[]; version=version)::Worker send_connection_hdr(w, false) send_msg_now(w, MsgHeader(), IdentifySocketAckMsg()) notify(w.initialized) end function handle_msg(msg::IdentifySocketAckMsg, header, r_stream, w_stream, version) - w = @lock CTX.map_sock_wrkr CTX.map_sock_wrkr[][r_stream] + ctx = CTX[] + w = @lock ctx.map_sock_wrkr ctx.map_sock_wrkr[][r_stream] w.version = version end function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version) throw_if_cluster_manager_unassigned() - CTX.lproc.id = msg.self_pid - controller = Worker(1, r_stream, w_stream, CTX.cluster_manager[]; version=version)::Worker + ctx = CTX[] + ctx.lproc.id = msg.self_pid + controller = Worker(1, r_stream, w_stream, ctx.cluster_manager[]; version=version)::Worker notify(controller.initialized) - register_worker(CTX.lproc) + register_worker(ctx.lproc) topology(msg.topology) if !msg.enable_threaded_blas @@ -343,7 +347,7 @@ function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version) end lazy = msg.lazy - CTX.pgrp.lazy = lazy + ctx.pgrp.lazy = lazy @sync for (connect_at, rpid) in msg.other_workers wconfig = WorkerConfig() @@ -352,9 +356,9 @@ function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version) let rpid=rpid, wconfig=wconfig if lazy # The constructor registers the object with a global registry. - Worker(rpid, ()->connect_to_peer(CTX.cluster_manager[], rpid, wconfig)) + Worker(rpid, ()->connect_to_peer(ctx.cluster_manager[], rpid, wconfig)) else - @async connect_to_peer(CTX.cluster_manager[], rpid, wconfig) + @async connect_to_peer(ctx.cluster_manager[], rpid, wconfig) end end end @@ -378,7 +382,8 @@ function connect_to_peer(manager::ClusterManager, rpid::Int, wconfig::WorkerConf end function handle_msg(msg::JoinCompleteMsg, header, r_stream, w_stream, version) - w = @lock CTX.map_sock_wrkr CTX.map_sock_wrkr[][r_stream] + ctx = CTX[] + w = @lock ctx.map_sock_wrkr ctx.map_sock_wrkr[][r_stream] environ = something(w.config.environ, Dict()) environ[:cpu_threads] = msg.cpu_threads w.config.environ = environ diff --git a/src/remotecall.jl b/src/remotecall.jl index ca02791..fbb789e 100644 --- a/src/remotecall.jl +++ b/src/remotecall.jl @@ -57,7 +57,7 @@ mutable struct RemoteChannel{T<:AbstractChannel} <: AbstractRemoteRef end function test_existing_ref(r::AbstractRemoteRef) - found = getkey(CTX.client_refs, r, nothing) + found = getkey(CTX[].client_refs, r, nothing) if found !== nothing @assert r.where > 0 if isa(r, Future) @@ -75,16 +75,16 @@ function test_existing_ref(r::AbstractRemoteRef) return found::typeof(r) end - CTX.client_refs[r] = nothing + CTX[].client_refs[r] = nothing finalizer(finalize_ref, r) return r end function finalize_ref(r::AbstractRemoteRef) if r.where > 0 # Handle the case of the finalizer having been called manually - if trylock(CTX.client_refs.lock) # trylock doesn't call wait which causes yields + if trylock(CTX[].client_refs.lock) # trylock doesn't call wait which causes yields try - delete!(CTX.client_refs.ht, r) # direct removal avoiding locks + delete!(CTX[].client_refs.ht, r) # direct removal avoiding locks if isa(r, RemoteChannel) send_del_client_no_lock(r) else @@ -95,7 +95,7 @@ function finalize_ref(r::AbstractRemoteRef) end r.where = 0 finally - unlock(CTX.client_refs.lock) + unlock(CTX[].client_refs.lock) end else finalizer(finalize_ref, r) @@ -160,8 +160,8 @@ A low-level API which returns the backing `AbstractChannel` for an `id` returned The call is valid only on the node where the backing channel exists. """ function channel_from_id(id) - rv = lock(CTX.client_refs) do - return get(CTX.pgrp.refs, id, false) + rv = lock(CTX[].client_refs) do + return get(CTX[].pgrp.refs, id, false) end if rv === false throw(ErrorException("Local instance of remote reference not found")) @@ -169,9 +169,9 @@ function channel_from_id(id) return rv.c end -lookup_ref(rrid::RRID, f=def_rv_channel) = lookup_ref(CTX.pgrp, rrid, f) +lookup_ref(rrid::RRID, f=def_rv_channel) = lookup_ref(CTX[].pgrp, rrid, f) function lookup_ref(pg, rrid, f) - return lock(CTX.client_refs) do + return lock(CTX[].client_refs) do rv = get(pg.refs, rrid, false) if rv === false # first we've heard of this ref @@ -230,9 +230,9 @@ end del_client(rr::AbstractRemoteRef) = del_client(remoteref_id(rr), myid()) -del_client(id, client) = del_client(CTX.pgrp, id, client) +del_client(id, client) = del_client(CTX[].pgrp, id, client) function del_client(pg, id, client) - lock(CTX.client_refs) do + lock(CTX[].client_refs) do _del_client(pg, id, client) end nothing @@ -264,11 +264,11 @@ end function start_gc_msgs_task() errormonitor( Threads.@spawn begin - while true - lock(CTX.any_gc_flag) do - # this might miss events - wait(CTX.any_gc_flag) - end + ctx = CTX[] + while !ctx.shutting_down[] + # This might miss events + @lock ctx.any_gc_flag wait(ctx.any_gc_flag) + # Use invokelatest() so that custom message transport streams # for workers can be defined in a newer world age than the Task # which runs the loop here. @@ -290,7 +290,7 @@ end function send_del_client_no_lock(rr) # for gc context to avoid yields if rr.where == myid() - _del_client(CTX.pgrp, remoteref_id(rr), myid()) + _del_client(CTX[].pgrp, remoteref_id(rr), myid()) elseif id_in_procs(rr.where) # process only if a valid worker process_worker(rr) end @@ -301,8 +301,8 @@ function publish_del_msg!(w::Worker, msg) push!(w.del_msgs, msg) @atomic w.gcflag = true end - lock(CTX.any_gc_flag) do - notify(CTX.any_gc_flag) + lock(CTX[].any_gc_flag) do + notify(CTX[].any_gc_flag) end end @@ -320,7 +320,7 @@ function process_worker(rr) end function add_client(id, client) - lock(CTX.client_refs) do + lock(CTX[].client_refs) do rv = lookup_ref(id) push!(rv.clientset, client) end @@ -345,8 +345,8 @@ function send_add_client(rr::AbstractRemoteRef, i) push!(w.add_msgs, (remoteref_id(rr), i)) @atomic w.gcflag = true end - lock(CTX.any_gc_flag) do - notify(CTX.any_gc_flag) + lock(CTX[].any_gc_flag) do + notify(CTX[].any_gc_flag) end end end @@ -448,8 +448,8 @@ function remotecall_fetch(f, w::Worker, args...; kwargs...) rv.waitingfor = w.id send_msg(w, MsgHeader(RRID(0,0), oid), CallMsg{:call_fetch}(f, args, kwargs)) v = take!(rv) - lock(CTX.client_refs) do - delete!(CTX.pgrp.refs, oid) + lock(CTX[].client_refs) do + delete!(CTX[].pgrp.refs, oid) end return isa(v, RemoteException) ? throw(v) : v end @@ -490,8 +490,8 @@ function remotecall_wait(f, w::Worker, args...; kwargs...) rr = Future(w) send_msg(w, MsgHeader(remoteref_id(rr), prid), CallWaitMsg(f, args, kwargs)) v = fetch(rv.c) - lock(CTX.client_refs) do - delete!(CTX.pgrp.refs, prid) + lock(CTX[].client_refs) do + delete!(CTX[].pgrp.refs, prid) end isa(v, RemoteException) && throw(v) return rr diff --git a/src/workerpool.jl b/src/workerpool.jl index ed7abc6..7b77b91 100644 --- a/src/workerpool.jl +++ b/src/workerpool.jl @@ -341,14 +341,14 @@ WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), function default_worker_pool()::AbstractWorkerPool # On workers retrieve the default worker pool from the master when accessed # for the first time - if CTX.default_worker_pool[] === nothing + if CTX[].default_worker_pool[] === nothing if myid() == 1 - CTX.default_worker_pool[] = WorkerPool() + CTX[].default_worker_pool[] = WorkerPool() else - CTX.default_worker_pool[] = remotecall_fetch(()->default_worker_pool(), 1) + CTX[].default_worker_pool[] = remotecall_fetch(()->default_worker_pool(), 1) end end - return CTX.default_worker_pool[]::AbstractWorkerPool + return CTX[].default_worker_pool[]::AbstractWorkerPool end """ @@ -357,7 +357,7 @@ end Set a [`AbstractWorkerPool`](@ref) to be used by `remote(f)` and [`pmap`](@ref) (by default). """ function default_worker_pool!(pool::AbstractWorkerPool) - CTX.default_worker_pool[] = pool + CTX[].default_worker_pool[] = pool end """ diff --git a/test/distributed_exec.jl b/test/distributed_exec.jl index 7ff66d0..f264288 100644 --- a/test/distributed_exec.jl +++ b/test/distributed_exec.jl @@ -170,21 +170,21 @@ function test_futures_dgc(id) fid = remoteref_id(f) # remote value should be deleted after a fetch - @test remotecall_fetch(k->(yield();haskey(DistributedNext.CTX.pgrp.refs, k)), id, fid) == true + @test remotecall_fetch(k->(yield();haskey(DistributedNext.CTX[].pgrp.refs, k)), id, fid) == true @test f.v === nothing @test fetch(f) == id @test f.v !== nothing yield(); # flush gc msgs - @test poll_while(() -> remotecall_fetch(k->(yield();haskey(DistributedNext.CTX.pgrp.refs, k)), id, fid)) + @test poll_while(() -> remotecall_fetch(k->(yield();haskey(DistributedNext.CTX[].pgrp.refs, k)), id, fid)) # if unfetched, it should be deleted after a finalize f = remotecall(myid, id) fid = remoteref_id(f) - @test remotecall_fetch(k->(yield();haskey(DistributedNext.CTX.pgrp.refs, k)), id, fid) == true + @test remotecall_fetch(k->(yield();haskey(DistributedNext.CTX[].pgrp.refs, k)), id, fid) == true @test f.v === nothing finalize(f) yield(); # flush gc msgs - @test poll_while(() -> remotecall_fetch(k->(yield();haskey(DistributedNext.CTX.pgrp.refs, k)), id, fid)) + @test poll_while(() -> remotecall_fetch(k->(yield();haskey(DistributedNext.CTX[].pgrp.refs, k)), id, fid)) end @testset "GC tests for Futures" begin @@ -204,23 +204,23 @@ end put!(fstore, f) @test fetch(f) == wid1 - @test remotecall_fetch(k->haskey(DistributedNext.CTX.pgrp.refs, k), wid1, fid) == true + @test remotecall_fetch(k->haskey(DistributedNext.CTX[].pgrp.refs, k), wid1, fid) == true remotecall_fetch(r->(fetch(fetch(r)); yield()), wid2, fstore) sleep(0.5) # to ensure that wid2 gc messages have been executed on wid1 - @test remotecall_fetch(k->haskey(DistributedNext.CTX.pgrp.refs, k), wid1, fid) == false + @test remotecall_fetch(k->haskey(DistributedNext.CTX[].pgrp.refs, k), wid1, fid) == false # put! should release remote reference since it would have been cached locally f = Future(wid1) fid = remoteref_id(f) # should not be created remotely till accessed - @test remotecall_fetch(k->haskey(DistributedNext.CTX.pgrp.refs, k), wid1, fid) == false + @test remotecall_fetch(k->haskey(DistributedNext.CTX[].pgrp.refs, k), wid1, fid) == false # create it remotely isready(f) - @test remotecall_fetch(k->haskey(DistributedNext.CTX.pgrp.refs, k), wid1, fid) == true + @test remotecall_fetch(k->haskey(DistributedNext.CTX[].pgrp.refs, k), wid1, fid) == true put!(f, :OK) - @test remotecall_fetch(k->haskey(DistributedNext.CTX.pgrp.refs, k), wid1, fid) == false + @test remotecall_fetch(k->haskey(DistributedNext.CTX[].pgrp.refs, k), wid1, fid) == false @test fetch(f) === :OK # RemoteException should be thrown on a put! when another process has set the value @@ -231,7 +231,7 @@ end put!(fstore, f) # send f to wid2 put!(f, :OK) # set value from master - @test remotecall_fetch(k->haskey(DistributedNext.CTX.pgrp.refs, k), wid1, fid) == true + @test remotecall_fetch(k->haskey(DistributedNext.CTX[].pgrp.refs, k), wid1, fid) == true testval = remotecall_fetch(wid2, fstore) do x try @@ -256,14 +256,14 @@ end f = remotecall_wait(identity, id_other, ones(10)) rrid = DistributedNext.RRID(f.whence, f.id) remotecall_fetch(f25847, id_other, f) - @test BitSet([id_me]) == remotecall_fetch(()->DistributedNext.CTX.pgrp.refs[rrid].clientset, id_other) + @test BitSet([id_me]) == remotecall_fetch(()->DistributedNext.CTX[].pgrp.refs[rrid].clientset, id_other) remotecall_fetch(f25847, id_other, f) - @test BitSet([id_me]) == remotecall_fetch(()->DistributedNext.CTX.pgrp.refs[rrid].clientset, id_other) + @test BitSet([id_me]) == remotecall_fetch(()->DistributedNext.CTX[].pgrp.refs[rrid].clientset, id_other) finalize(f) yield() # flush gc msgs - @test poll_while(() -> remotecall_fetch(chk_rrid->(yield(); haskey(DistributedNext.CTX.pgrp.refs, chk_rrid)), id_other, rrid)) + @test poll_while(() -> remotecall_fetch(chk_rrid->(yield(); haskey(DistributedNext.CTX[].pgrp.refs, chk_rrid)), id_other, rrid)) end @testset "GC tests for RemoteChannels" begin @@ -273,12 +273,12 @@ end rrid = remoteref_id(rr) # remote value should be deleted after finalizing the ref - @test remotecall_fetch(k->(yield();haskey(DistributedNext.CTX.pgrp.refs, k)), id, rrid) == true + @test remotecall_fetch(k->(yield();haskey(DistributedNext.CTX[].pgrp.refs, k)), id, rrid) == true @test fetch(rr) === :OK - @test remotecall_fetch(k->(yield();haskey(DistributedNext.CTX.pgrp.refs, k)), id, rrid) == true + @test remotecall_fetch(k->(yield();haskey(DistributedNext.CTX[].pgrp.refs, k)), id, rrid) == true finalize(rr) yield(); # flush gc msgs - @test poll_while(() -> remotecall_fetch(k->(yield();haskey(DistributedNext.CTX.pgrp.refs, k)), id, rrid)) + @test poll_while(() -> remotecall_fetch(k->(yield();haskey(DistributedNext.CTX[].pgrp.refs, k)), id, rrid)) end test_remoteref_dgc(id_me) test_remoteref_dgc(id_other) @@ -291,13 +291,13 @@ end fstore = RemoteChannel(wid2) put!(fstore, rr) - @test timedwait(() -> remotecall_fetch(k -> haskey(DistributedNext.CTX.pgrp.refs, k), wid1, rrid), 10) == :ok + @test timedwait(() -> remotecall_fetch(k -> haskey(DistributedNext.CTX[].pgrp.refs, k), wid1, rrid), 10) == :ok finalize(rr) # finalize locally yield() # flush gc msgs - @test remotecall_fetch(k -> haskey(DistributedNext.CTX.pgrp.refs, k), wid1, rrid) == true + @test remotecall_fetch(k -> haskey(DistributedNext.CTX[].pgrp.refs, k), wid1, rrid) == true remotecall_fetch(r -> (finalize(take!(r)); yield(); nothing), wid2, fstore) # finalize remotely sleep(0.5) # to ensure that wid2 messages have been executed on wid1 - @test poll_while(() -> remotecall_fetch(k -> haskey(DistributedNext.CTX.pgrp.refs, k), wid1, rrid)) + @test poll_while(() -> remotecall_fetch(k -> haskey(DistributedNext.CTX[].pgrp.refs, k), wid1, rrid)) end end @@ -813,7 +813,7 @@ if DoFullTest all_w = workers() # Test sending fake data to workers. The worker processes will print an # error message but should not terminate. - for w in DistributedNext.CTX.pgrp.workers + for w in DistributedNext.CTX[].pgrp.workers if isa(w, DistributedNext.Worker) local s = connect(w.config.host, w.config.port) write(s, randstring(32)) @@ -1120,7 +1120,7 @@ end end function test_blas_config(pid, expected) - for worker in DistributedNext.CTX.pgrp.workers + for worker in DistributedNext.CTX[].pgrp.workers if worker.id == pid @test worker.config.enable_threaded_blas == expected return @@ -1628,7 +1628,7 @@ function launch(manager::WorkerArgTester, params::Dict, launched::Array, c::Cond exename = params[:exename] exeflags = params[:exeflags] - cmd = `$exename $exeflags --bind-to $(DistributedNext.CTX.lproc.bind_addr) $(manager.worker_opt)` + cmd = `$exename $exeflags --bind-to $(DistributedNext.CTX[].lproc.bind_addr) $(manager.worker_opt)` cmd = pipeline(detach(setenv(cmd, dir=dir))) io = open(cmd, "r+") manager.write_cookie && DistributedNext.write_cookie(io) @@ -1671,7 +1671,7 @@ nprocs()>1 && rmprocs(workers()) exeflags = params[:exeflags] jlcmd = "using DistributedNext; start_worker(\"\"; close_stdin=$(manager.close_stdin), stderr_to_stdout=$(manager.stderr_to_stdout));" - cmd = detach(setenv(`$exename $exeflags --bind-to $(DistributedNext.CTX.lproc.bind_addr) -e $jlcmd`, dir=dir)) + cmd = detach(setenv(`$exename $exeflags --bind-to $(DistributedNext.CTX[].lproc.bind_addr) -e $jlcmd`, dir=dir)) proc = open(cmd, "r+") wconfig = WorkerConfig() @@ -1716,7 +1716,7 @@ end remotecall_fetch(p) do ports_lower = [] # ports of pids lower than myid() ports_higher = [] # ports of pids higher than myid() - for w in DistributedNext.CTX.pgrp.workers + for w in DistributedNext.CTX[].pgrp.workers w.id == myid() && continue port = Sockets._sockname(w.r_stream, true)[2] if (w.id == 1) @@ -2015,7 +2015,7 @@ end # status to have been deleted. Only works if the worker has a status of # course. function wait_for_deregistration(pid) - statuses = DistributedNext.CTX.map_pid_statuses + statuses = DistributedNext.CTX[].map_pid_statuses @test timedwait(() -> @lock(statuses, !haskey(statuses[], pid)), 10) == :ok end diff --git a/test/topology.jl b/test/topology.jl index 448be5d..6c75a71 100644 --- a/test/topology.jl +++ b/test/topology.jl @@ -44,7 +44,7 @@ using Random exename = params[:exename] exeflags = params[:exeflags] - cmd = `$exename $exeflags --bind-to $(DistributedNext.CTX.lproc.bind_addr) $(DistributedNext.get_worker_arg())` + cmd = `$exename $exeflags --bind-to $(DistributedNext.CTX[].lproc.bind_addr) $(DistributedNext.get_worker_arg())` cmd = pipeline(detach(setenv(cmd, dir=dir))) for i in 1:manager.np io = open(cmd, "r+") @@ -101,7 +101,7 @@ using Random @everywhere if !isdefined(Main, :count_connected_workers) function count_connected_workers() count(x -> isa(x, DistributedNext.Worker) && isdefined(x, :r_stream) && isopen(x.r_stream), - DistributedNext.CTX.pgrp.workers) + DistributedNext.CTX[].pgrp.workers) end end end