Skip to content

Commit

Permalink
Add threading support (#363)
Browse files Browse the repository at this point in the history
Adds `MPI.Init_thread` and the `ThreadLevel` enum, along with a threaded test.

Additionally, set the UCX_ERROR_SIGNALS environment variable if not already set to fix #337.
  • Loading branch information
simonbyrne committed Mar 20, 2020
1 parent ac4ed7a commit 84efa98
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 6 deletions.
4 changes: 4 additions & 0 deletions deps/gen_consts.jl
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ MPI_handle = [
]

MPI_Cints = [
:MPI_THREAD_SINGLE,
:MPI_THREAD_FUNNELED,
:MPI_THREAD_SERIALIZED,
:MPI_THREAD_MULTIPLE,
:MPI_PROC_NULL,
:MPI_ANY_SOURCE,
:MPI_ANY_TAG,
Expand Down
1 change: 1 addition & 0 deletions docs/src/environment.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mpiexec
```@docs
MPI.Abort
MPI.Init
MPI.Init_thread
MPI.Initialized
MPI.Finalize
MPI.Finalized
Expand Down
8 changes: 8 additions & 0 deletions src/MPI.jl
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ function __init__()
ENV["UCX_MEM_MALLOC_RELOC"] = "no"
ENV["UCX_MEM_EVENTS"] = "no"

# Julia multithreading uses SIGSEGV to sync thread
# https://docs.julialang.org/en/v1/devdocs/debuggingtips/#Dealing-with-signals-1
# By default, UCX will error if this occurs (issue #337)
if !haskey(ENV, "UCX_ERROR_SIGNALS")
# default is "SIGILL,SIGSEGV,SIGBUS,SIGFPE"
ENV["UCX_ERROR_SIGNALS"] = "SIGILL,SIGBUS,SIGFPE"
end

@require CuArrays="3a865a2d-5b23-5a0f-bc46-62713ec82fae" include("cuda.jl")
end

Expand Down
6 changes: 6 additions & 0 deletions src/consts/microsoftmpi.jl
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# https://github.com/microsoft/Microsoft-MPI/blob/master/src/include/mpi.h

const MPI_Aint = Int
const MPI_Offset = Int64
const MPI_Count = Int64
Expand Down Expand Up @@ -64,6 +66,10 @@ const MPI_C_DOUBLE_COMPLEX = reinterpret(Cint, 0x4c001014)
const MPI_File = Cint
const MPI_FILE_NULL = Cint(0)

const MPI_THREAD_SINGLE = Cint(0)
const MPI_THREAD_FUNNELED = Cint(1)
const MPI_THREAD_SERIALIZED = Cint(2)
const MPI_THREAD_MULTIPLE = Cint(3)
const MPI_PROC_NULL = Cint(-1)
const MPI_ANY_SOURCE = Cint(-2)
const MPI_ANY_TAG = Cint(-1)
Expand Down
6 changes: 6 additions & 0 deletions src/consts/mpich.jl
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# https://github.com/pmodels/mpich/blob/master/src/include/mpi.h.in

const MPI_Aint = Int
const MPI_Count = Int64
const MPI_Offset = Int64
Expand Down Expand Up @@ -69,6 +71,10 @@ const MPI_UINT64_T = Cint(1275070526)
const MPI_C_FLOAT_COMPLEX = Cint(1275070528)
const MPI_C_DOUBLE_COMPLEX = Cint(1275072577)

const MPI_THREAD_SINGLE = Cint(0)
const MPI_THREAD_FUNNELED = Cint(1)
const MPI_THREAD_SERIALIZED = Cint(2)
const MPI_THREAD_MULTIPLE = Cint(3)
const MPI_PROC_NULL = Cint(-1)
const MPI_ANY_SOURCE = Cint(-2)
const MPI_ANY_TAG = Cint(-1)
Expand Down
6 changes: 6 additions & 0 deletions src/consts/openmpi.jl
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# https://github.com/open-mpi/ompi/blob/master/ompi/include/mpi.h.in

const MPI_Aint = Int
const MPI_Count = Int64
const MPI_Offset = Int64
Expand Down Expand Up @@ -75,6 +77,10 @@ const MPI_UINT64_T = Cint(65)
const MPI_C_FLOAT_COMPLEX = Cint(69)
const MPI_C_DOUBLE_COMPLEX = Cint(70)

const MPI_THREAD_SINGLE = Cint(0)
const MPI_THREAD_FUNNELED = Cint(1)
const MPI_THREAD_SERIALIZED = Cint(2)
const MPI_THREAD_MULTIPLE = Cint(3)
const MPI_PROC_NULL = Cint(-2)
const MPI_ANY_SOURCE = Cint(-1)
const MPI_ANY_TAG = Cint(-1)
Expand Down
64 changes: 62 additions & 2 deletions src/environment.jl
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ end
Initialize MPI in the current process.
All MPI programs must contain exactly one call to `MPI.Init()`. In particular, note that it is not valid to call `MPI.Init` again after calling [`MPI.Finalize`](@ref).
All MPI programs must contain exactly one call to `MPI.Init` or
[`MPI.Init_thread`](@ref). In particular, note that it is not valid to call `MPI.Init` or
`MPI.Init_thread` again after calling [`MPI.Finalize`](@ref).
The only MPI functions that may be called before `MPI.Init()` are
The only MPI functions that may be called before `MPI.Init`/`MPI.Init_thread` are
[`MPI.Initialized`](@ref) and [`MPI.Finalized`](@ref).
# External links
Expand All @@ -53,6 +55,64 @@ function Init()
end
end

@enum ThreadLevel begin
THREAD_SINGLE = MPI_THREAD_SINGLE
THREAD_FUNNELED = MPI_THREAD_FUNNELED
THREAD_SERIALIZED = MPI_THREAD_SERIALIZED
THREAD_MULTIPLE = MPI_THREAD_MULTIPLE
end


"""
Init_thread(required::ThreadLevel)
Initialize MPI and the MPI thread environment in the current process. The argument
specifies the required thread level, which is one of the following:
- `MPI.THREAD_SINGLE`: Only one thread will execute.
- `MPI.THREAD_FUNNELED`: The process may be multi-threaded, but the application must ensure that only the main thread makes MPI calls.
- `MPI.THREAD_SERIALIZED`: The process may be multi-threaded, and multiple threads may make MPI calls, but only one at a time (i.e. all MPI calls are serialized).
- `MPI.THREAD_MULTIPLE`: Multiple threads may call MPI, with no restrictions.
Tne function will return the provided `ThreadLevel`, and values may be compared via inequalities, i.e.
```julia
if Init_thread(required) < required
error("Insufficient threading")
end
```
All MPI programs must contain exactly one call to [`MPI.Init`](@ref) or
`MPI.Init_thread`. In particular, note that it is not valid to call `MPI.Init` or
`MPI.Init_thread` again after calling [`MPI.Finalize`](@ref).
The only MPI functions that may be called before `MPI.Init`/`MPI.Init_thread` are
[`MPI.Initialized`](@ref) and [`MPI.Finalized`](@ref).
# External links
$(_doc_external("MPI_Init_thread"))
"""
function Init_thread(required::ThreadLevel)
REFCOUNT[] == -1 || error("MPI.REFCOUNT in incorrect state: MPI may only be initialized once per session.")
r_provided = Ref{ThreadLevel}()
# int MPI_Init_thread(int *argc, char ***argv, int required, int *provided)
@mpichk ccall((:MPI_Init_thread, libmpi), Cint,
(Ptr{Cint},Ptr{Cvoid}, ThreadLevel, Ref{ThreadLevel}),
C_NULL, C_NULL, required, r_provided)
provided = r_provided[]
if provided < required
@warn "Thread level requested = $required, provided = $provided"
end

REFCOUNT[] = 1
atexit(refcount_dec)

for f in mpi_init_hooks
f()
end
return provided
end


"""
Finalize()
Expand Down
9 changes: 5 additions & 4 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ const coverage_opts =
JL_LOG_USER => "user",
JL_LOG_ALL => "all")

# Files to run with mpiexec -n 1
singlefiles = ["test_spawn.jl"]

function runtests()
nprocs = clamp(Sys.CPU_THREADS, 2, 4)
exename = joinpath(Sys.BINDIR, Base.julia_exename())
Expand All @@ -33,8 +30,12 @@ function runtests()
for f in testfiles
coverage_opt = coverage_opts[Base.JLOptions().code_coverage]
mpiexec() do cmd
if f singlefiles
if f == "test_spawn.jl"
run(`$cmd -n 1 $exename --code-coverage=$coverage_opt $(joinpath(testdir, f))`)
elseif f == "test_threads.jl"
withenv("JULIA_NUM_THREAD" => "4") do
run(`$cmd -n $nprocs $exename --code-coverage=$coverage_opt $(joinpath(testdir, f))`)
end
else
run(`$cmd -n $nprocs $exename --code-coverage=$coverage_opt $(joinpath(testdir, f))`)
end
Expand Down
38 changes: 38 additions & 0 deletions test/test_threads.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
using Test, Pkg
using MPI

if get(ENV,"JULIA_MPI_TEST_ARRAYTYPE","") == "CuArray"
using CuArrays
ArrayType = CuArray
else
ArrayType = Array
end

provided = MPI.Init_thread(MPI.THREAD_MULTIPLE)

comm = MPI.COMM_WORLD
size = MPI.Comm_size(comm)
rank = MPI.Comm_rank(comm)

const N = 10

dst = mod(rank+1, size)
src = mod(rank-1, size)

if provided == MPI.THREAD_MULTIPLE
send_arr = collect(1.0:N)
recv_arr = zeros(N)

reqs = Array{MPI.Request}(undef, 2N)

Threads.@threads for i = 1:N
reqs[N+i] = MPI.Irecv!(@view(recv_arr[i:i]), src, i, comm)
reqs[i] = MPI.Isend(@view(send_arr[i:i]), dst, i, comm)
end

MPI.Waitall!(reqs)

@test recv_arr == send_arr
end

MPI.Finalize()

0 comments on commit 84efa98

Please sign in to comment.