Skip to content

Commit

Permalink
Support mapreduce with one worker per node (#10)
Browse files Browse the repository at this point in the history
* one worker per node

* Add examples

* version bump to v0.8.1
  • Loading branch information
jishnub authored Apr 5, 2021
1 parent f7c3163 commit 41e052f
Show file tree
Hide file tree
Showing 17 changed files with 524 additions and 39 deletions.
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "ParallelUtilities"
uuid = "fad6cfc8-4f83-11e9-06cc-151124046ad0"
authors = ["Jishnu Bhattacharya"]
version = "0.8.0"
version = "0.8.1"

[deps]
DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8"
Expand Down
5 changes: 5 additions & 0 deletions docs/make.jl
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ makedocs(;
pages=[
"ParallelUtilities" => "index.md",
"Mapreduce" => "pmapreduce.md",
"Examples" => [
"pmapreduce" => "examples/pmapreduce.md",
"SharedArrays" => "examples/sharedarrays.md",
"Threads" => "examples/threads.md",
],
"ClusterQueryUtils" => "clusterquery.md",
"Reference" => "api.md",
],
Expand Down
1 change: 1 addition & 0 deletions docs/src/examples/pmapreduce.jl
60 changes: 60 additions & 0 deletions docs/src/examples/pmapreduce.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Example of the use of pmapreduce

The function `pmapreduce` performs a parallel `mapreduce`. This is primarily useful when the function has to perform an expensive calculation, that is the evaluation time per core exceeds the setup and communication time. This is also useful when each core is allocated memory and has to work with arrays that won't fit into memory collectively, as is often the case on a cluster.

We walk through an example where we initialize and concatenate arrays in serial and in parallel.

We load the necessary modules first

```julia
using ParallelUtilities
using Distributed
```

We define the function that performs the initialization on each core. This step is embarassingly parallel as no communication happens between workers. We simulate an expensive calculation by adding a sleep interval for each index.

```julia
function initialize(sleeptime)
A = Array{Int}(undef, 20, 20)
for ind in eachindex(A)
sleep(sleeptime)
A[ind] = ind
end
return A
end
```

Next we define the function that calls `pmapreduce`:

```julia
function main_pmapreduce(sleeptime)
pmapreduce(x -> initialize(sleeptime), hcat, 1:20)
end
```

We also define a function that carries out a serial mapreduce:

```julia
function main_mapreduce(sleeptime)
mapreduce(x -> initialize(sleeptime), hcat, 1:20)
end
```

We compare the performance of the serial and parallel evaluations using 20 cores on one node:

```julia
julia> compare_with_serial()
Tesing serial
9.457601 seconds (40.14 k allocations: 1.934 MiB)
Tesing parallel
0.894611 seconds (23.16 k allocations: 1.355 MiB, 2.56% compilation time)
Results match : true
```

The full script may be found [here](pmapreduce.jl). To run this, use

```julia
julia> @everywhere include("pmapreduce.jl")

julia> PMapReduceTiming.compare_with_serial()
```
1 change: 1 addition & 0 deletions docs/src/examples/sharedarrays.jl
114 changes: 114 additions & 0 deletions docs/src/examples/sharedarrays.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# Using SharedArrays in a parallel mapreduce

One might want to carry out a computation across several nodes of a cluster, where each node works on its own shared array. This may be achieved by using a `WorkerPool` that consists of one worker per node, which acts as a root process launching tasks on that node, and eventually returning the local array for an overall reduction across nodes.

We walk through one such example where we concatenate arrays that are locally initialized on each node.

We load the packages necessary, in this case these are `ParallelUtilities`, `SharedArrays` and `Distributed`.

```julia
using ParallelUtilities
using SharedArrays
using Distributed
```

We create a function to initailize the local part on each worker. In this case we simulate a heavy workload by adding a `sleep` period. In other words we assume that the individual elements of the array are expensive to evaluate. We obtain the local indices of the `SharedArray` through the function `localindices`.

```julia
function initialize_localpart(s, sleeptime)
for ind in localindices(s)
sleep(sleeptime)
s[ind] = ind
end
end
```

We create a function that runs on the root worker on each node and feeds tasks to other workers on that node. We use the function `ParallelUtilities.workers_myhost()` to obtain a list of all workers on the same node. We create the `SharedArray` on these workers so that it is entirely contained on one machine. This is achieved by passing the keyword argument `pids` to the `SharedArray` constructor. We asynchronously spawn tasks to initialize the local parts of the shared array on each worker.

```julia
function initializenode_sharedarray(sleeptime)
# obtain the workers on the local machine
pids = ParallelUtilities.workers_myhost()

# Create a shared array spread across the workers on that node
s = SharedArray{Int}((2_000,), pids = pids)

# spawn remote tasks to initialize the local part of the shared array
@sync for p in pids
@spawnat p initialize_localpart(s, sleeptime)
end
return sdata(s)
end
```

We create a main function that runs on the calling process and concatenates the arrays on each node. This is run on a `WorkerPool` consisting of one worker per node which acts as the root process. We may obtain such a pool through the function `ParallelUtilities.workerpool_nodes()`. Finally we call `pmapreduce` with a mapping function that initializes an array on each node, which is followed by a concatenation across the nodes.

```julia
function main_sharedarray(sleeptime)
# obtain the workerpool with one process on each node
pool = ParallelUtilities.workerpool_nodes()

# obtain the number of workers in the pool.
nw_node = nworkers(pool)

# Evaluate the parallel mapreduce
pmapreduce(x -> initializenode_sharedarray(sleeptime), hcat, pool, 1:nw_node)
end
```

We compare the results with a serial execution that uses a similar workflow, except we use `Array` instead of `SharedArray` and `mapreduce` instead of `pmapreduce`.

```julia
function initialize_serial(sleeptime)
pids = ParallelUtilities.workers_myhost()
s = Array{Int}(undef, 2_000)
for ind in eachindex(s)
sleep(sleeptime)
s[ind] = ind
end
return sdata(s)
end

function main_serial(sleeptime)
pool = ParallelUtilities.workerpool_nodes()
nw_node = nworkers(pool)
mapreduce(x -> initialize_serial(sleeptime), hcat, 1:nw_node)
end
```

We create a function to compare the performance of the two. We start with a precompilation run with no sleep time, followed by recording the actual timings.

```julia
function compare_with_serial()
# precompile
main_serial(0)
main_sharedarray(0)

# time
println("Testing serial")
A = @time main_serial(5e-3)
println("Testing sharedarray")
B = @time main_sharedarray(5e-3)

println("Results match : ", A == B)
end
```

We run this script on a Slurm cluster across 2 nodes with 28 cores on each node. The results are:

```julia
julia> compare_with_serial()
Testing serial
24.624912 seconds (27.31 k allocations: 1.017 MiB)
Testing sharedarray
1.077752 seconds (4.60 k allocations: 246.281 KiB)
Results match : true
```

The full script may be found [here](sharedarrays.jl). To run this, use

```julia
julia> @everywhere include("sharedarrays.jl")

julia> SharedArraysTiming.compare_with_serial()
```
1 change: 1 addition & 0 deletions docs/src/examples/threads.jl
95 changes: 95 additions & 0 deletions docs/src/examples/threads.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# Using Threads in a parallel mapreduce

One might want to carry out a computation across several nodes of a cluster, where each node uses multithreading to evaluate a result that is subsequently reduced across all nodes. We walk through one such example where we concatenate arrays that are locally initialized on each node using threads.

We load the packages necessary, in this case these are `ParallelUtilities` and `Distributed`.

```julia
using ParallelUtilities
using Distributed
```

We create a function to initailize the local part on each worker. In this case we simulate a heavy workload by adding a `sleep` period. In other words we assume that the individual elements of the array are expensive to evaluate. We use `Threads.@threads` to split up the loop into sections that are processed on invidual threads.

```julia
function initializenode_threads(sleeptime)
s = zeros(Int, 2_000)
Threads.@threads for ind in eachindex(s)
sleep(sleeptime)
s[ind] = ind
end
return s
end
```

We create a main function that runs on the calling process and launches the array initialization task on each node. This is run on a `WorkerPool` consisting of one worker per node which acts as the root process. We may obtain such a pool through the function `ParallelUtilities.workerpool_nodes()`. The array creation step on each node is followed by an eventual concatenation.

```julia
function main_threads(sleeptime)
# obtain the workerpool with one process on each node
pool = ParallelUtilities.workerpool_nodes()

# obtain the number of workers in the pool.
nw_nodes = nworkers(pool)

# Evaluate the parallel mapreduce
pmapreduce(x -> initializenode_threads(sleeptime), hcat, pool, 1:nw_nodes)
end
```

We compare the results with a serial execution that uses a similar workflow, except we use `mapreduce` instead of `pmapreduce` and do not use threads.

```julia
function initialize_serial(sleeptime)
s = zeros(Int, 2_000)
for ind in eachindex(s)
sleep(sleeptime)
s[ind] = ind
end
return s
end

function main_serial(sleeptime)
pool = ParallelUtilities.workerpool_nodes()
nw_nodes = nworkers(pool)
mapreduce(x -> initialize_serial(sleeptime), hcat, 1:nw_nodes)
end
```

We create a function to compare the performance of the two. We start with a precompilation run with no sleep time, followed by recording the actual timings with a sleep time of 5 milliseconds for each index of the array.

```julia
function compare_with_serial()
# precompile
main_serial(0)
main_threads(0)

# time
println("Testing serial")
A = @time main_serial(5e-3);
println("Testing threads")
B = @time main_threads(5e-3);

println("Results match : ", A == B)
end
```

We run this script on a Slurm cluster across 2 nodes with 28 cores on each node. The results are:

```julia
julia> compare_with_serial()
Testing serial
24.601593 seconds (22.49 k allocations: 808.266 KiB)
Testing threads
0.666256 seconds (3.71 k allocations: 201.703 KiB)
Results match : true
```

The full script may be found [here](threads.jl). To run this, use

```julia
julia> @everywhere include("threads.jl")

julia> ThreadsTiming.compare_with_serial()
```

2 changes: 1 addition & 1 deletion docs/src/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,4 @@ This performance gap might reduce in the future.

1. This package currently does not implement a specialized `mapreduce` for arrays, so the behavior might differ for specialized array argument types (eg. `DistributedArray`s). This might change in the future.

2. This package deals with distributed (multi-core) parallelism, and at this moment it has not been tested alongside multi-threading.
2. This package deals with distributed (multi-core) parallelism, and at this moment it has not been tested extensively alongside multi-threading. Multithreading + multiprocessing has been tested where the number of threads times the number of processes equals the number of available cores. See [an example](examples/threads.md) of multithreading used in such a form, where each node uses threads locally, and reduction across nodes is performed using multiprocessing.
38 changes: 38 additions & 0 deletions examples/pmapreduce.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
module PMapReduceTiming

using ParallelUtilities
using Distributed

function initialize(sleeptime)
A = Array{Int}(undef, 20, 20)
for ind in eachindex(A)
sleep(sleeptime)
A[ind] = ind
end
return A
end

function main_mapreduce(sleeptime)
mapreduce(x -> initialize(sleeptime), hcat, 1:20)
end

function main_pmapreduce(sleeptime)
pmapreduce(x -> initialize(sleeptime), hcat, 1:20)
end

function compare_with_serial()
# precompile
main_mapreduce(0)
main_pmapreduce(0)

# time
println("Tesing serial")
A = @time main_mapreduce(5e-6)
println("Tesing parallel")
B = @time main_pmapreduce(5e-6)

# check results
println("Results match : ", A == B)
end

end
Loading

2 comments on commit 41e052f

@jishnub
Copy link
Owner Author

@jishnub jishnub commented on 41e052f Apr 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JuliaRegistrator
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Registration pull request created: JuliaRegistries/General/33572

After the above pull request is merged, it is recommended that a tag is created on this repository for the registered package version.

This will be done automatically if the Julia TagBot GitHub Action is installed, or can be done manually through the github interface, or via:

git tag -a v0.8.1 -m "<description of version>" 41e052fd623802647f86a0601e287f0cd4f578e2
git push origin v0.8.1

Please sign in to comment.