Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel io additions #83

Merged
merged 37 commits into from
Jan 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
68c5f05
removed metadata md from loadData, panel set to nothing as default
oHunewald Jan 13, 2020
e7689e9
removed md from loadData call
oHunewald Jan 13, 2020
f9137be
removed md from loadData call
oHunewald Jan 13, 2020
a78fc10
overloaded function getTotalSize for single file load
oHunewald Jan 13, 2020
683e676
add function splitRange, overload generateIO for single file splitting
oHunewald Jan 13, 2020
a32e913
finished single file splitting
oHunewald Jan 13, 2020
d11b85b
include test single file splitting
oHunewald Jan 13, 2020
6d1e87e
tests for single file splitting and loading
oHunewald Jan 13, 2020
259a635
removed cc from trainGigaSOM and doEpoch in new version
oHunewald Jan 14, 2020
5720358
check for different types of panel, to subset the data while loading
oHunewald Jan 14, 2020
73d86d9
removed cc from function call
oHunewald Jan 14, 2020
2fb0b08
added transform as optional parameter
oHunewald Jan 14, 2020
a3a303f
set named arguments to false, code cleaning
oHunewald Jan 14, 2020
e90254b
added test functions for loadData
oHunewald Jan 14, 2020
f142f98
added test loadData
oHunewald Jan 14, 2020
32b31c1
add fcs manual loading as reference
oHunewald Jan 14, 2020
9445452
added named arguments as they are now false by default
oHunewald Jan 14, 2020
b77861b
commented functions
oHunewald Jan 15, 2020
fa55d68
use loadData as wrapper around file splitting and loading, added rang…
oHunewald Jan 15, 2020
492ac3f
added function header for generateIO for single file
oHunewald Jan 22, 2020
cab83b3
changed info to error
oHunewald Jan 22, 2020
9e03ca3
corrected function header loadData
oHunewald Jan 22, 2020
b516e6e
typo
oHunewald Jan 22, 2020
05b4165
removed time macro
oHunewald Jan 22, 2020
e5476d6
removed comments
oHunewald Jan 22, 2020
d8927b0
combined getTotalSize into one
oHunewald Jan 22, 2020
e596769
removed _
oHunewald Jan 22, 2020
842517b
forgot test marco
oHunewald Jan 22, 2020
67b699f
added transform:true to loadData
oHunewald Jan 22, 2020
ed54d72
combined tests
oHunewald Jan 22, 2020
bf9f266
removed redundant code
oHunewald Jan 23, 2020
80ffa98
merging splitrange and splitting functions
laurentheirendt Jan 24, 2020
d7fd17c
change of variable name
laurentheirendt Jan 24, 2020
80dfd34
documentation changes
laurentheirendt Jan 24, 2020
cc02bc3
merging the output functions
laurentheirendt Jan 24, 2020
6d01e50
rename load function
laurentheirendt Jan 24, 2020
a3c33f5
Merge pull request #2 from laurentheirendt/parallel-io-additions-lh
oHunewald Jan 24, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions docs/tutorials/example_workflow.jl
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,16 @@ nWorkers = 2
addprocs(nWorkers, topology=:master_worker)
@everywhere using GigaSOM

generateIO(dataPath, md, nWorkers, true, 1, true)

R = Vector{Any}(undef,nWorkers)

@time @sync for (idx, pid) in enumerate(workers())
@async R[idx] = fetch(@spawnat pid loadData(idx, "input-$idx.jls", md, panel))
end
# R: Array of reference to each data files per worker
# use '_' or just "R, " to ignore the second return value
# second return value is used later for indexing the data files
R, _ = loadData(dataPath, md, nWorkers, panel=panel, reduce=true, transform=true)

som = initGigaSOM(R, 10, 10)

cc = map(Symbol, vcat(lineageMarkers, functionalMarkers))

@time som = trainGigaSOM(som, R, cc)
@time som = trainGigaSOM(som, R)

winners = mapToGigaSOM(som, R)

Expand Down
6 changes: 3 additions & 3 deletions src/core.jl
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ end
- `radiusFun`: Function that generates radius decay, e.g. `linearRadius` or `expRadius(10.0)`
- `epochs`: number of SOM training iterations (default 10)
"""
function trainGigaSOM(som::Som, trainRef::Array{Any,1}, cc;
function trainGigaSOM(som::Som, trainRef::Array{Any,1};
kernelFun::Function = gaussianKernel,
metric = Euclidean(),
knnTreeFun = BruteTree,
Expand Down Expand Up @@ -155,7 +155,7 @@ function trainGigaSOM(som::Som, trainRef::Array{Any,1}, cc;
for (idx, pid) in enumerate(workers())
@async begin
# @info pid
R[idx] = fetch(@spawnat pid begin doEpoch(trainRef[idx], codes, tree, cc) end)
R[idx] = fetch(@spawnat pid begin doEpoch(trainRef[idx], codes, tree) end)
globalSumNumerator += R[idx][1]
globalSumDenominator += R[idx][2]
end
Expand Down Expand Up @@ -275,7 +275,7 @@ vectors and the adjustment in radius after each epoch.
- `codes`: Codebook
- `tree`: knn-compatible tree built upon the codes
"""
function doEpoch(x::Ref, codes::Array{Float64, 2}, tree, cc)
function doEpoch(x::Ref, codes::Array{Float64, 2}, tree)

# initialise numerator and denominator with 0's
sumNumerator = zeros(Float64, size(codes))
Expand Down
96 changes: 78 additions & 18 deletions src/io/input.jl
Original file line number Diff line number Diff line change
Expand Up @@ -69,40 +69,99 @@ function readFlowFrame(filename::String)
end

"""
loadData(idx, fn, md,panel; method = "asinh", cofactor = 5,
reduce = true, sort = true)
loadData(dataPath, data, nWorkers; panel=Nothing(),
type = "fcs", method = "asinh", cofactor = 5,
reduce = false, sort = false, transform = false)

This function is of 2 parts. Part 1: Generates the temporary binaray files to be loaded by the
workers. The Input data will be equally divided into n parts according to the number of workers.
Part2: each worker loads independently its own data-package in parallel and returns

# Arguments:
- `dataPath`: path to data folder
- `data`: single filename::String or a metadata::DataFrame with a column sample_name
- `panel`: Panel table with a column for Lineage Markers and one for Functional Markers,
or Array::{Int} used as column indices, default: Nothing()
- `type`: String, type of datafile, default FCS
- `method`: transformation method, default arcsinh, optional
- `cofactor`: Cofactor for transformation, default 5, optional
- `reduce`: Selected only columns which are defined by lineage and functional, optional,
default: false. If false the check for any none columns to be removed (none columns can appear
after concatenating FCS files as well as parameter like: time, event length)
- `sort`: Sort columns by name to make sure the order when concatinating the dataframes, optional, default: false
- `transform`: Boolean to indicate if the data will be transformed according to method, default: false
"""
function loadData(dataPath, data, nWorkers; panel=Nothing(),
type = "fcs", method = "asinh", cofactor = 5,
reduce = false, sort = false, transform = false)

xRange = generateIO(dataPath, data, nWorkers, true, 1, true)
oHunewald marked this conversation as resolved.
Show resolved Hide resolved

R = Vector{Any}(undef,nWorkers)

# Load the data by each worker
# Without panel file, all columns are loaded:
# loadData(idx, "input-$idx.jls")
# Columns ca be selected by an array of indicies:
# loadData(idx, "input-$idx.jls", [3:6;9:11]) <- this will concatenate ranges into arrays
# Please note that all optional arguments are by default "false"
if type == "fcs"
@sync for (idx, pid) in enumerate(workers())
@async R[idx] = fetch(@spawnat pid loadDataFile(idx, "input-$idx.jls", panel, method,
cofactor,reduce, sort, transform))
end
else
@error "File Type not yet supported!"
end

return R, xRange

end

"""
loadDataFile(idx, fn, panel, method, cofactor, reduce, sort, transform)

Load the data in parallel on each worker. Returns a reference of the loaded Data

# Arguments:
- `idx`: worker index
- `fn`: filename
- `md`: Metadata table
- `panel`: Panel table with a column for Lineage Markers and one for Functional Markers
- `panel`: Panel table with a column for Lineage Markers and one for Functional Markers,
or Array::{Int} used as column indicies
- `method`: transformation method, default arcsinh, optional
- `cofactor`: Cofactor for transformation, default 5, optional
- `reduce`: Selected only columns which are defined by lineage and functional, optional,
default: true. If false the check for any none columns to be removed (none columns can appear
after concatenating FCS files as well as parameter like: time, event length)
- `sort`: Sort columns by name to make sure the order when concatinating the dataframes, optional, default: true
- `transform`: Boolean to indicate if the data will be transformed according to method
oHunewald marked this conversation as resolved.
Show resolved Hide resolved
"""
function loadData(idx, fn, md, panel; method = "asinh", cofactor = 5,
reduce = true, sort = true)
function loadDataFile(idx, fn, panel, method, cofactor, reduce, sort, transform)

y = open(deserialize, fn)
fcsRaw = y[idx]
cleanNames!(fcsRaw)

# extract lineage markers
lineageMarkers, functionalMarkers = getMarkers(panel)

cc = map(Symbol, vcat(lineageMarkers, functionalMarkers))
# markers can be lineage and functional at tthe same time
# therefore make cc unique
unique!(cc)
fcsData = y[idx]
cleanNames!(fcsData)

# Define the clustering column by range object
if typeof(panel) == Array{Int64,1}
cc = panel
elseif typeof(panel) == DataFrame
# extract lineage markers
lineageMarkers, functionalMarkers = getMarkers(panel)
cc = map(Symbol, vcat(lineageMarkers, functionalMarkers))
# markers can be lineage and functional at tthe same time
# therefore make cc unique
unique!(cc)
else
# If no panel is provided, use all column names as cc
# and set reduce to false
cc = map(Symbol, names(fcsData))
end

fcsData = transformData(fcsRaw, method, cofactor)
fcsData = sortReduce(fcsData, cc, reduce, sort)
if transform
fcsData = transformData(fcsData, method, cofactor)
end
sortReduce(fcsData, cc, reduce, sort)

# get the sample_id from md
# return value is an array with only one entry -> take [1]
Expand All @@ -118,3 +177,4 @@ function loadData(idx, fn, md, panel; method = "asinh", cofactor = 5,

return (dfallRef)
end

106 changes: 92 additions & 14 deletions src/satellites.jl
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,15 @@ and at the given location.
- `inSize`: Vector with the lengths of each file within the input data set
- `runSum`: Running sum of the `inSize` vector (`runSum[end] == totalSize`)
"""
function getTotalSize(loc, md, printLevel=0)
function getTotalSize(loc, md::Any, printLevel=0)
global totalSize, tmpSum

# define the file names
fileNames = sort(md.file_name)
if md == typeof(String)
filenames = [md]
else
# define the file names
fileNames = sort(md.file_name)
end

# out the number of files
if printLevel > 0
Expand Down Expand Up @@ -231,6 +235,7 @@ function getTotalSize(loc, md, printLevel=0)
return totalSize, inSize, runSum
end


"""
splitting(totalSize, nWorkers, printLevel=0)

Expand All @@ -250,7 +255,10 @@ given the total size and the number of workers
"""
function splitting(totalSize, nWorkers, printLevel=0)
# determine the size per file
fileL = Int(floor(totalSize/nWorkers))
fileL = div(totalSize, nWorkers)

# determine the remainder
extras = rem(totalSize,nWorkers)

# determine the size of the last (residual) file
lastFileL = Int(fileL + totalSize - nWorkers * fileL)
Expand All @@ -262,7 +270,21 @@ function splitting(totalSize, nWorkers, printLevel=0)
@info " > Total row count: $totalSize cells"
end

return fileL, lastFileL
# determine the ranges
nchunks = fileL > 0 ? nWorkers : extras
chunks = Vector{UnitRange{Int}}(undef, nchunks)
lo = 1
for i in 1:nchunks
hi = lo + fileL - 1
if extras > 0
hi += 1
extras -= 1
end
chunks[i] = lo:hi
lo = hi+1
end

return fileL, lastFileL, chunks
end

"""
Expand Down Expand Up @@ -449,7 +471,7 @@ Generate binary .jls files given a path to files, their metadata, and the number
# INPUTS

- `filePath`: path to the files
- `md`: Metadata table
- `md`: Metadata table, or single file String
- `nWorkers`: number of workers
- `generateFiles`: Boolean to actually generate files
- `printLevel`: Verbose level (0: mute)
Expand All @@ -465,7 +487,7 @@ if `generateFiles` is `true`:
`nWorkers` files named `input-<workerID>.jls` saved in the directory `filePath`.

"""
function generateIO(filePath, md, nWorkers, generateFiles=true, printLevel=0, saveIndices=false)
function generateIO(filePath, md::DataFrame, nWorkers, generateFiles=true, printLevel=0, saveIndices=false)

# determin the total size, the vector with sizes, and their running sum
totalSize, inSize, runSum = getTotalSize(filePath, md, printLevel)
Expand Down Expand Up @@ -506,19 +528,54 @@ function generateIO(filePath, md, nWorkers, generateFiles=true, printLevel=0, sa
end

# output the file per worker
if generateFiles
open(f -> serialize(f,out), "input-$worker.jls", "w")
if printLevel > 0
printstyled("[ Info: > File input-$worker.jls written.\n", color=:green, bold=true)
end
end
outputFile(out, "input-$worker.jls", generateFiles)
end

if saveIndices
return localStartVect, localEndVect
end
end

"""
generateIO(filePath, fn::String, nWorkers, generateFiles=true, printLevel=0, saveIndices=false)

Generate binary .jls files for a single file given a path and the number of workers

# INPUTS

- `filePath`: path to the files
- `fn`: file name
- `nWorkers`: number of workers
- `generateFiles`: Boolean to actually generate files
- `printLevel`: Verbose level (0: mute)
- `saveIndices`: Boolean to save the local indices

# OUTPUTS

if `saveIndices` is `true`:
- `chunks`: start index of local file

if `generateFiles` is `true`:
- `nWorkers` files named `input-<workerID>.jls` saved in the directory `filePath`.

"""
function generateIO(filePath, fn::String, nWorkers, generateFiles=true, printLevel=0, saveIndices=false)
oHunewald marked this conversation as resolved.
Show resolved Hide resolved

# read the single file and split it according to the number of workers.
inFile = readFlowFrame(filePath * Base.Filesystem.path_separator * fn)
_, _, chunks = splitting(size(inFile, 1), nWorkers, 0)

for i in 1:length(chunks)
out = Dict()
out[i] = inFile[chunks[i], :]
outputFile(out, "input-$i.jls", generateFiles)
end

if saveIndices
return chunks
end
end

"""
rmFile(fileName, printLevel = 1)

Expand All @@ -529,7 +586,7 @@ Remove a file.
- `fileName`: name of file to be removed
- `printLevel`: Verbose level (0: mute)
"""
function rmFile(fileName, printLevel = 1)
function rmFile(fileName, printLevel=0)
try
if printLevel > 0
printstyled("> Removing $fileName ... ", color=:yellow)
Expand All @@ -543,4 +600,25 @@ function rmFile(fileName, printLevel = 1)
printstyled("(file $fileName does not exist - skipping).\n", color=:red)
end
end
end

"""
outputFile(out, fileName, generateFiles=true, printLevel=0)

Generate a file given a name and content.

# INPUTS

- `out`: content of the file
- `fileName`: name of file to be removed
- `generateFiles`: Boolean to actually generate files
- `printLevel`: Verbose level (0: mute)
"""
function outputFile(out, fileName, generateFiles=true, printLevel=0)
if generateFiles
open(f -> serialize(f,out), fileName, "w")
if printLevel > 0
printstyled("[ Info: > File $fileName written.\n", color=:green, bold=true)
end
end
end
2 changes: 2 additions & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ checkDir()
include("testSatellites.jl")
include("testSplitting.jl")
include("testTrainingOuputEquality.jl")
include("testSingleFileSplitting.jl")
include("testLoadData.jl")
end

cd(owd)
Loading