Skip to content

Commit

Permalink
Single file multi-threading (read-only) (#477)
Browse files Browse the repository at this point in the history
* start parallel update

* Update JLD2.jl

* start parallel read capability

* file handle created for every parallel_read

* always lock

* remove commented tests

* Delete Manifest.toml

* delete temp file intests

* fixes & changelog

* slightly refined tests and relaxed condition

* remove accidental test.jld2

* use realpath

---------

Co-authored-by: Jonas Isensee <jonas.isensee@ds.mpg.de>
  • Loading branch information
ejmeitz and Jonas Isensee committed Jul 26, 2023
1 parent 6e36daa commit c47f45b
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 23 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
*.jl.cov
*.jl.mem

*.jld2
*.h5
*.nc
*.jld
/test/test_out.jld

docs/build/
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## 0.4.35
## 0.4.33
- fix `Upgrade` for parametric types
- new type reconstruction when matching DataType cannot be found (eval-free)
- new `parallel_read` keyword for creating stand-alone file handles for multithreaded file reading (@ejmeitz)

## 0.4.32
- add experimental `JLD2.readas` function for customized reading of custom serialized objects (#468)
Expand Down
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name = "JLD2"
uuid = "033835bb-8acc-5ee8-8aae-3f567f8a3819"
version = "0.4.32"
version = "0.4.33"

[deps]
FileIO = "5789e2e9-d7fb-5bc7-8068-2c6fae9b9549"
Expand Down
55 changes: 35 additions & 20 deletions src/JLD2.jl
Original file line number Diff line number Diff line change
Expand Up @@ -303,40 +303,51 @@ function jldopen(fname::AbstractString, wr::Bool, create::Bool, truncate::Bool,
compress=false,
mmaparrays::Bool=false,
typemap::Dict{String}=Dict{String,Any}(),
parallel_read::Bool=false,
) where T<:Union{Type{IOStream},Type{MmapIO}}
mmaparrays && @warn "mmaparrays keyword is currently ignored" maxlog=1
verify_compressor(compress)
exists = ispath(fname)

# Can only open multiple in parallel if mode is "r"
if parallel_read && (wr, create, truncate) != (false, false, false)
throw(ArgumentError("Cannot open file in a parallel context unless mode is \"r\""))
end

lock(OPEN_FILES_LOCK)

f = try
if exists
rname = realpath(fname)
# catch existing file system entities that are not regular files
if !isfile(rname)
throw(ArgumentError("not a regular file: $fname"))
end
!isfile(rname) && throw(ArgumentError("not a regular file: $fname"))

# If in serial, return existing handle. In parallel always generate a new handle
if haskey(OPEN_FILES, rname)
ref = OPEN_FILES[rname]
f = ref.value
if !isnothing(f)
if truncate
throw(ArgumentError("attempted to truncate a file that was already open"))
elseif !isa(f, JLDFile{iotype})
throw(ArgumentError("attempted to open file with $iotype backend, but already open with a different backend"))
elseif f.writable != wr
current = wr ? "read/write" : "read-only"
previous = f.writable ? "read/write" : "read-only"
throw(ArgumentError("attempted to open file $(current), but file was already open $(previous)"))
elseif f.compress != compress
throw(ArgumentError("attempted to open file with compress=$(compress), but file was already open with compress=$(f.compress)"))
elseif f.mmaparrays != mmaparrays
throw(ArgumentError("attempted to open file with mmaparrays=$(mmaparrays), but file was already open with mmaparrays=$(f.mmaparrays)"))
if parallel_read
f.writable && throw(ArgumentError("Tried to open file in a parallel context but it is open in write-mode elsewhere in a serial context."))
else
if truncate
throw(ArgumentError("attempted to truncate a file that was already open"))
elseif !isa(f, JLDFile{iotype})
throw(ArgumentError("attempted to open file with $iotype backend, but already open with a different backend"))
elseif f.writable != wr
current = wr ? "read/write" : "read-only"
previous = f.writable ? "read/write" : "read-only"
throw(ArgumentError("attempted to open file $(current), but file was already open $(previous)"))
elseif f.compress != compress
throw(ArgumentError("attempted to open file with compress=$(compress), but file was already open with compress=$(f.compress)"))
elseif f.mmaparrays != mmaparrays
throw(ArgumentError("attempted to open file with mmaparrays=$(mmaparrays), but file was already open with mmaparrays=$(f.mmaparrays)"))
end

f = f::JLDFile{iotype}
f.n_times_opened += 1
return f
end

f = f::JLDFile{iotype}
f.n_times_opened += 1
return f
end
end
end
Expand All @@ -345,7 +356,11 @@ function jldopen(fname::AbstractString, wr::Bool, create::Bool, truncate::Bool,
created = !exists || truncate
rname = realpath(fname)
f = JLDFile(io, rname, wr, created, compress, mmaparrays)
OPEN_FILES[rname] = WeakRef(f)

if !parallel_read
OPEN_FILES[rname] = WeakRef(f)
end

f
catch e
rethrow(e)
Expand Down
44 changes: 44 additions & 0 deletions test/loadsave.jl
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,50 @@ JLD2.rconvert(::Type{CR}, dsa::CRSerialized) = CR(dsa.r)
end
end

# Test jldsave
@testset "Multi-threaded read" begin
fn = joinpath(mktempdir(), "test.jld2")

jldsave(fn; a=1, b=2)

#########################
# Valid access patterns #
#########################

#Normal read
jldopen(fn, "r"; parallel_read = true) do f
@test f["a"] == 1
@test f["b"] == 2
@test fn keys(JLD2.OPEN_FILES)
end

# Can read in parallel and serial (read-only)
f1 = jldopen(fn)
f2 = jldopen(fn; parallel_read = true)
@test JLD2.OPEN_FILES[realpath(fn)] == f1
@test f1 != f2
close(f1); close(f2)

f1 = jldopen(fn, "a")
@test_throws ArgumentError jldopen(fn; parallel_read = true)
close(f1)

###########################
# Invalid access patterns #
###########################

# Open for non-read in parallel context
@test_throws ArgumentError jldopen(fn, "w"; parallel_read = true) do f end
@test_throws ArgumentError jldopen(fn, "w+"; parallel_read = true) do f end
@test_throws ArgumentError jldopen(fn, "r+"; parallel_read = true) do f end
@test_throws ArgumentError jldopen(fn, "a+"; parallel_read = true) do f end
@test_throws ArgumentError jldopen(fn, "a"; parallel_read = true) do f end


rm(fn; force = true, recursive = true)
end


###################################################################################################
## `Upgrade` Tests
###################################################################################################
Expand Down

2 comments on commit c47f45b

@JonasIsensee
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JuliaRegistrator
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Registration pull request created: JuliaRegistries/General/88360

After the above pull request is merged, it is recommended that a tag is created on this repository for the registered package version.

This will be done automatically if the Julia TagBot GitHub Action is installed, or can be done manually through the github interface, or via:

git tag -a v0.4.33 -m "<description of version>" c47f45b0403f07b6fab24c80f4cf1de71527eafd
git push origin v0.4.33

Please sign in to comment.