diff --git a/deps/gen_consts.jl b/deps/gen_consts.jl index 0131d4e41..dd9cf5fe2 100644 --- a/deps/gen_consts.jl +++ b/deps/gen_consts.jl @@ -86,6 +86,10 @@ MPI_handle = [ ] MPI_Cints = [ + :MPI_THREAD_SINGLE, + :MPI_THREAD_FUNNELED, + :MPI_THREAD_SERIALIZED, + :MPI_THREAD_MULTIPLE, :MPI_PROC_NULL, :MPI_ANY_SOURCE, :MPI_ANY_TAG, diff --git a/docs/src/environment.md b/docs/src/environment.md index 1d9eeec8c..9f5103959 100644 --- a/docs/src/environment.md +++ b/docs/src/environment.md @@ -11,6 +11,7 @@ mpiexec ```@docs MPI.Abort MPI.Init +MPI.Init_thread MPI.Initialized MPI.Finalize MPI.Finalized diff --git a/src/MPI.jl b/src/MPI.jl index 6f54a97aa..6b0b5cbe6 100644 --- a/src/MPI.jl +++ b/src/MPI.jl @@ -86,6 +86,14 @@ function __init__() ENV["UCX_MEM_MALLOC_RELOC"] = "no" ENV["UCX_MEM_EVENTS"] = "no" + # Julia multithreading uses SIGSEGV to sync thread + # https://docs.julialang.org/en/v1/devdocs/debuggingtips/#Dealing-with-signals-1 + # By default, UCX will error if this occurs (issue #337) + if !haskey(ENV, "UCX_ERROR_SIGNALS") + # default is "SIGILL,SIGSEGV,SIGBUS,SIGFPE" + ENV["UCX_ERROR_SIGNALS"] = "SIGILL,SIGBUS,SIGFPE" + end + @require CuArrays="3a865a2d-5b23-5a0f-bc46-62713ec82fae" include("cuda.jl") end diff --git a/src/consts/microsoftmpi.jl b/src/consts/microsoftmpi.jl index 7c88ca52b..e4c76f71c 100644 --- a/src/consts/microsoftmpi.jl +++ b/src/consts/microsoftmpi.jl @@ -1,3 +1,5 @@ +# https://github.com/microsoft/Microsoft-MPI/blob/master/src/include/mpi.h + const MPI_Aint = Int const MPI_Offset = Int64 const MPI_Count = Int64 @@ -64,6 +66,10 @@ const MPI_C_DOUBLE_COMPLEX = reinterpret(Cint, 0x4c001014) const MPI_File = Cint const MPI_FILE_NULL = Cint(0) +const MPI_THREAD_SINGLE = Cint(0) +const MPI_THREAD_FUNNELED = Cint(1) +const MPI_THREAD_SERIALIZED = Cint(2) +const MPI_THREAD_MULTIPLE = Cint(3) const MPI_PROC_NULL = Cint(-1) const MPI_ANY_SOURCE = Cint(-2) const MPI_ANY_TAG = Cint(-1) diff --git a/src/consts/mpich.jl b/src/consts/mpich.jl index 64dab1204..4636370fd 100644 --- a/src/consts/mpich.jl +++ b/src/consts/mpich.jl @@ -1,3 +1,5 @@ +# https://github.com/pmodels/mpich/blob/master/src/include/mpi.h.in + const MPI_Aint = Int const MPI_Count = Int64 const MPI_Offset = Int64 @@ -69,6 +71,10 @@ const MPI_UINT64_T = Cint(1275070526) const MPI_C_FLOAT_COMPLEX = Cint(1275070528) const MPI_C_DOUBLE_COMPLEX = Cint(1275072577) +const MPI_THREAD_SINGLE = Cint(0) +const MPI_THREAD_FUNNELED = Cint(1) +const MPI_THREAD_SERIALIZED = Cint(2) +const MPI_THREAD_MULTIPLE = Cint(3) const MPI_PROC_NULL = Cint(-1) const MPI_ANY_SOURCE = Cint(-2) const MPI_ANY_TAG = Cint(-1) diff --git a/src/consts/openmpi.jl b/src/consts/openmpi.jl index 8419bc8cb..0dd871a33 100644 --- a/src/consts/openmpi.jl +++ b/src/consts/openmpi.jl @@ -1,3 +1,5 @@ +# https://github.com/open-mpi/ompi/blob/master/ompi/include/mpi.h.in + const MPI_Aint = Int const MPI_Count = Int64 const MPI_Offset = Int64 @@ -75,6 +77,10 @@ const MPI_UINT64_T = Cint(65) const MPI_C_FLOAT_COMPLEX = Cint(69) const MPI_C_DOUBLE_COMPLEX = Cint(70) +const MPI_THREAD_SINGLE = Cint(0) +const MPI_THREAD_FUNNELED = Cint(1) +const MPI_THREAD_SERIALIZED = Cint(2) +const MPI_THREAD_MULTIPLE = Cint(3) const MPI_PROC_NULL = Cint(-2) const MPI_ANY_SOURCE = Cint(-1) const MPI_ANY_TAG = Cint(-1) diff --git a/src/environment.jl b/src/environment.jl index 6e2d0f9d0..a7bb3f075 100644 --- a/src/environment.jl +++ b/src/environment.jl @@ -34,9 +34,11 @@ end Initialize MPI in the current process. -All MPI programs must contain exactly one call to `MPI.Init()`. In particular, note that it is not valid to call `MPI.Init` again after calling [`MPI.Finalize`](@ref). +All MPI programs must contain exactly one call to `MPI.Init` or +[`MPI.Init_thread`](@ref). In particular, note that it is not valid to call `MPI.Init` or +`MPI.Init_thread` again after calling [`MPI.Finalize`](@ref). -The only MPI functions that may be called before `MPI.Init()` are +The only MPI functions that may be called before `MPI.Init`/`MPI.Init_thread` are [`MPI.Initialized`](@ref) and [`MPI.Finalized`](@ref). # External links @@ -53,6 +55,64 @@ function Init() end end +@enum ThreadLevel begin + THREAD_SINGLE = MPI_THREAD_SINGLE + THREAD_FUNNELED = MPI_THREAD_FUNNELED + THREAD_SERIALIZED = MPI_THREAD_SERIALIZED + THREAD_MULTIPLE = MPI_THREAD_MULTIPLE +end + + +""" + Init_thread(required::ThreadLevel) + +Initialize MPI and the MPI thread environment in the current process. The argument +specifies the required thread level, which is one of the following: + + - `MPI.THREAD_SINGLE`: Only one thread will execute. + - `MPI.THREAD_FUNNELED`: The process may be multi-threaded, but the application must ensure that only the main thread makes MPI calls. + - `MPI.THREAD_SERIALIZED`: The process may be multi-threaded, and multiple threads may make MPI calls, but only one at a time (i.e. all MPI calls are serialized). + - `MPI.THREAD_MULTIPLE`: Multiple threads may call MPI, with no restrictions. + +Tne function will return the provided `ThreadLevel`, and values may be compared via inequalities, i.e. +```julia +if Init_thread(required) < required + error("Insufficient threading") +end +``` + +All MPI programs must contain exactly one call to [`MPI.Init`](@ref) or +`MPI.Init_thread`. In particular, note that it is not valid to call `MPI.Init` or +`MPI.Init_thread` again after calling [`MPI.Finalize`](@ref). + +The only MPI functions that may be called before `MPI.Init`/`MPI.Init_thread` are +[`MPI.Initialized`](@ref) and [`MPI.Finalized`](@ref). + +# External links +$(_doc_external("MPI_Init_thread")) +""" +function Init_thread(required::ThreadLevel) + REFCOUNT[] == -1 || error("MPI.REFCOUNT in incorrect state: MPI may only be initialized once per session.") + r_provided = Ref{ThreadLevel}() + # int MPI_Init_thread(int *argc, char ***argv, int required, int *provided) + @mpichk ccall((:MPI_Init_thread, libmpi), Cint, + (Ptr{Cint},Ptr{Cvoid}, ThreadLevel, Ref{ThreadLevel}), + C_NULL, C_NULL, required, r_provided) + provided = r_provided[] + if provided < required + @warn "Thread level requested = $required, provided = $provided" + end + + REFCOUNT[] = 1 + atexit(refcount_dec) + + for f in mpi_init_hooks + f() + end + return provided +end + + """ Finalize() diff --git a/test/runtests.jl b/test/runtests.jl index e0e6a4128..ac2c08224 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -17,9 +17,6 @@ const coverage_opts = JL_LOG_USER => "user", JL_LOG_ALL => "all") -# Files to run with mpiexec -n 1 -singlefiles = ["test_spawn.jl"] - function runtests() nprocs = clamp(Sys.CPU_THREADS, 2, 4) exename = joinpath(Sys.BINDIR, Base.julia_exename()) @@ -33,8 +30,12 @@ function runtests() for f in testfiles coverage_opt = coverage_opts[Base.JLOptions().code_coverage] mpiexec() do cmd - if f ∈ singlefiles + if f == "test_spawn.jl" run(`$cmd -n 1 $exename --code-coverage=$coverage_opt $(joinpath(testdir, f))`) + elseif f == "test_threads.jl" + withenv("JULIA_NUM_THREAD" => "4") do + run(`$cmd -n $nprocs $exename --code-coverage=$coverage_opt $(joinpath(testdir, f))`) + end else run(`$cmd -n $nprocs $exename --code-coverage=$coverage_opt $(joinpath(testdir, f))`) end diff --git a/test/test_threads.jl b/test/test_threads.jl new file mode 100644 index 000000000..78e0086d7 --- /dev/null +++ b/test/test_threads.jl @@ -0,0 +1,38 @@ +using Test, Pkg +using MPI + +if get(ENV,"JULIA_MPI_TEST_ARRAYTYPE","") == "CuArray" + using CuArrays + ArrayType = CuArray +else + ArrayType = Array +end + +provided = MPI.Init_thread(MPI.THREAD_MULTIPLE) + +comm = MPI.COMM_WORLD +size = MPI.Comm_size(comm) +rank = MPI.Comm_rank(comm) + +const N = 10 + +dst = mod(rank+1, size) +src = mod(rank-1, size) + +if provided == MPI.THREAD_MULTIPLE + send_arr = collect(1.0:N) + recv_arr = zeros(N) + + reqs = Array{MPI.Request}(undef, 2N) + + Threads.@threads for i = 1:N + reqs[N+i] = MPI.Irecv!(@view(recv_arr[i:i]), src, i, comm) + reqs[i] = MPI.Isend(@view(send_arr[i:i]), dst, i, comm) + end + + MPI.Waitall!(reqs) + + @test recv_arr == send_arr +end + +MPI.Finalize()