Distributed Majority Voting Algorithm in Julia
1. Introduction
The original majority voting problem is to find the element that appears more than half times in a given array, and if there is no such majority element, algorithm should just return empty result.
There are some extended versions of this problem, like leetcode: Majority Element II. We can consider a general form: for an array of
In this post, I will give both sequential and distributed algorithm for this general form majority voting problem, along with complexity analysis. I will also use this algorithm as an example to explore how to write parallel computing program in Julia. It turns out to be a very pleasant journey.
2. Sequential Boyer-Moore algorithm
The brute force solution could be sorting the array, or counting appearances for all elements, like python’s collections.Counter
. However, that would cost either
I will not deliberate the original algorithm since this post is about general
For a given
function BoyerMoore(A::AbstractArray{T, 1}, k::Int=2)::Dict{T, Int} where T
candidates = Dict{T, Int}()
for a in A
if length(candidates) < k - 1 || haskey(candidates, a)
candidates[a] = get!(candidates, a, 0) + 1
else
to_del = Vector{T}()
for key in keys(candidates)
candidates[key] -= 1
candidates[key] <= 0 && append!(to_del, key)
end
for key in to_del
pop!(candidates, key)
end
end
end
return candidates
end
function majority_element(A::Vector{T}, k::Int=2)::Vector{T} where T
@assert k >= 2 "k must be an integer no less than 2"
candidates = BoyerMoore(A, k)
for key in keys(candidates)
candidates[key] = 0
end
for a in A
haskey(candidates, a) && (candidates[a] += 1)
end
bar = div(length(A), k) + 1
return [key for (key, v) in candidates if v >= bar]
end
The potential updating time for candidates
in each look up is
3. Distributed majority voting algorithm
The majority element property can be easily extend to a distributed version. The candidates obtained from each sub array, can be viewed as a simplified “counter”, and we can merge them to get the global candidates.
I will give both multi-processing and multi-threading approaches in Julia.
The only tricky part is in the merge phase. When a distinctive element shows up, instead of decreasing one for all candidates’ count, we have to subtract the current minimum count (probably not the new distinctive ones). See merge_candidates!
below.
Multi-processing
With the help of SharedArrays
and @distributed
macro, the parallel code in Julia is very neat.
@everywhere function merge_candidates!(X::Dict{T, Int}, Y::Dict{T, Int}, k::Int=2) where T
for (key, v) in Y
if length(X) < k - 1 || haskey(X, key)
X[key] = get!(X, key, 0) + v
else
min_v = min(minimum(values(X)), v)
to_del = Vector{T}()
for a in keys(X)
X[a] -= min_v
X[a] <= 0 && append!(to_del, a)
end
for a in to_del
pop!(X, a)
end
v > min_v && (X[key] = v - min_v)
end
end
return X
end
function distributed_majority_element(A::Vector{T}, p::Int, k::Int=2)::Vector{T} where T
@assert k >= 2 "k must be an integer no less than 2"
n = length(A)
step = n ÷ p
A = SharedVector(A)
candidates = @distributed merge_candidates! for i = 1:p
left = (i - 1) * step + 1
right = i == p ? n : i * step
BoyerMoore(view(A, left:right), k)
end
global_counter = @distributed mergewith(+) for i = 1:p
counter = Dict(key => 0 for (key, v) in candidates)
left = (i - 1) * step + 1
right = i == p ? n : i * step
for a in view(A, left:right)
haskey(counter, a) && (counter[a] += 1)
end
counter
end
bar = n ÷ k + 1
return [key for (key, v) in global_counter if v >= bar]
end
Multi-Threading
In recent updated v1.5
, multi-threading is no longer an experimental feature in Julia
function parallel_majority_element(A::Vector{T}, p::Int, k::Int=2)::Vector{T} where T
@assert k >= 2 "k must be an integer no less than 2"
n = length(A)
step = n ÷ p
pool = Vector{Dict{T, Int}}(undef, p)
Threads.@threads for i = 1:p
left = (i - 1) * step + 1
right = i == p ? n : i * step
pool[i] = BoyerMoore(view(A, left:right), k)
end
candidates = reduce(merge_candidates!, pool)
Threads.@threads for i = 1:p
pool[i] = Dict(key => 0 for (key, v) in candidates)
left = (i - 1) * step + 1
right = i == p ? n : i * step
for a in view(A, left:right)
haskey(pool[i], a) && (pool[i][a] += 1)
end
end
counter = reduce(mergewith(+), pool)
bar = n ÷ k + 1
return [key for (key, v) in counter if v >= bar]
end
Complexity analysis for distributed version
We parallel the procedure with merge_candidates!
, the arguments X
and Y
may have
The verification part is less than above, and we also ignore the cost analysis of latency and bandwidth, since they are too small in this case. Therefore the total cost is distributed general form majority voting algorithm is
4. Benchmark
For a parallel computing benchmark, I started Julia in my laptop with julia -p 7 -t 8
.
julia> using BenchmarkTools
julia> nprocs()
8
julia> Threads.nthreads()
8
julia> A = vcat([repeat([i], i) for i = 1:10]...);
julia> A_big = repeat(A, 2_000_000);
julia> @btime y = majority_element($A_big, 11)
3.980 s (8 allocations: 896 bytes)
5-element Array{Int64,1}:
7
9
10
8
6
julia> @btime y = distributed_majority_element($A_big, 8, 11)
4.840 s (2019 allocations: 94.38 KiB)
5-element Array{Int64,1}:
7
9
10
8
6
julia> @btime y = parallel_majority_element($A_big, 8, 11)
1.215 s (180 allocations: 25.23 KiB)
5-element Array{Int64,1}:
7
9
10
8
6
We can see the multi-processing version is slower than the sequential version. The much more allocations (2019 vs 8) could be the cause.
For comparison, I also implement a sequential version majority voting algorithm in Python. The execute time for the same data above is 33.415s
, way behind Julia.
lang | exec mode | time (s) |
---|---|---|
python 3.8.5 | sequential | 33.415 |
julia 1.5.1 | sequential | 3.980 |
julia 1.5.1 | multi-processing | 4.840 |
julia 1.5.1 | multi-threading | 1.215 |
5. Summary
I present a general form of majority voting algorithm, i.e. each majority element appears more than