Distributed Majority Voting Algorithm in Julia
1. Introduction
The original majority voting problem is to find the element that appears more than half times in a given array, and if there is no such majority element, algorithm should just return empty result.
There are some extended versions of this problem, like leetcode: Majority Element II. We can consider a general form: for an array of \(n\) elements and a given integer \(k \ge 2\), find all elements that appears more than \(n/k\) times in that array. And the original problem becomes the \(k = 2\) case.
In this post, I will give both sequential and distributed algorithm for this general form majority voting problem, along with complexity analysis. I will also use this algorithm as an example to explore how to write parallel computing program in Julia. It turns out to be a very pleasant journey.
2. Sequential Boyer-Moore algorithm
The brute force solution could be sorting the array, or counting appearances for all elements, like python’s collections.Counter
. However, that would cost either \(O(n\log n)\) time complexity or extra \(O(n)\) space. The Boyer-Moore algorithm is the classic algorithm to solve this problem, which runs in \(O(n)\) time and \(O(1)\) extra space.
I will not deliberate the original algorithm since this post is about general \(1/k\) form and distributed scenario. The core insight is the same as \(k=2\) case. Those majorities can provide one element for each of those non-majority elements, and they will remain some after those “annihilation” if they are true majorities.
For a given \(k\), there are \(k - 1\) majority elements as most. We can use a hash map to record all candidates. See Julia code below for details.
function BoyerMoore(A::AbstractArray{T, 1}, k::Int=2)::Dict{T, Int} where T
candidates = Dict{T, Int}()
for a in A
if length(candidates) < k - 1 || haskey(candidates, a)
candidates[a] = get!(candidates, a, 0) + 1
else
to_del = Vector{T}()
for key in keys(candidates)
candidates[key] -= 1
candidates[key] <= 0 && append!(to_del, key)
end
for key in to_del
pop!(candidates, key)
end
end
end
return candidates
end
function majority_element(A::Vector{T}, k::Int=2)::Vector{T} where T
@assert k >= 2 "k must be an integer no less than 2"
candidates = BoyerMoore(A, k)
for key in keys(candidates)
candidates[key] = 0
end
for a in A
haskey(candidates, a) && (candidates[a] += 1)
end
bar = div(length(A), k) + 1
return [key for (key, v) in candidates if v >= bar]
end
The potential updating time for candidates
in each look up is \(O(k)\), which adds up to \(O(n k)\) in total. The verification phase cost remains \(O(n)\). So the time complexity for the sequential general form majority voting algorithm is \(O(n k) + O(n) = O(n k)\). The extra space complexity is \(O(k)\).
3. Distributed majority voting algorithm
The majority element property can be easily extend to a distributed version. The candidates obtained from each sub array, can be viewed as a simplified “counter”, and we can merge them to get the global candidates.
I will give both multi-processing and multi-threading approaches in Julia.
The only tricky part is in the merge phase. When a distinctive element shows up, instead of decreasing one for all candidates’ count, we have to subtract the current minimum count (probably not the new distinctive ones). See merge_candidates!
below.
Multi-processing
With the help of SharedArrays
and @distributed
macro, the parallel code in Julia is very neat.
@everywhere function merge_candidates!(X::Dict{T, Int}, Y::Dict{T, Int}, k::Int=2) where T
for (key, v) in Y
if length(X) < k - 1 || haskey(X, key)
X[key] = get!(X, key, 0) + v
else
min_v = min(minimum(values(X)), v)
to_del = Vector{T}()
for a in keys(X)
X[a] -= min_v
X[a] <= 0 && append!(to_del, a)
end
for a in to_del
pop!(X, a)
end
v > min_v && (X[key] = v - min_v)
end
end
return X
end
function distributed_majority_element(A::Vector{T}, p::Int, k::Int=2)::Vector{T} where T
@assert k >= 2 "k must be an integer no less than 2"
n = length(A)
step = n ÷ p
A = SharedVector(A)
candidates = @distributed merge_candidates! for i = 1:p
left = (i - 1) * step + 1
right = i == p ? n : i * step
BoyerMoore(view(A, left:right), k)
end
global_counter = @distributed mergewith(+) for i = 1:p
counter = Dict(key => 0 for (key, v) in candidates)
left = (i - 1) * step + 1
right = i == p ? n : i * step
for a in view(A, left:right)
haskey(counter, a) && (counter[a] += 1)
end
counter
end
bar = n ÷ k + 1
return [key for (key, v) in global_counter if v >= bar]
end
Multi-Threading
In recent updated v1.5
, multi-threading is no longer an experimental feature in Julia
function parallel_majority_element(A::Vector{T}, p::Int, k::Int=2)::Vector{T} where T
@assert k >= 2 "k must be an integer no less than 2"
n = length(A)
step = n ÷ p
pool = Vector{Dict{T, Int}}(undef, p)
Threads.@threads for i = 1:p
left = (i - 1) * step + 1
right = i == p ? n : i * step
pool[i] = BoyerMoore(view(A, left:right), k)
end
candidates = reduce(merge_candidates!, pool)
Threads.@threads for i = 1:p
pool[i] = Dict(key => 0 for (key, v) in candidates)
left = (i - 1) * step + 1
right = i == p ? n : i * step
for a in view(A, left:right)
haskey(pool[i], a) && (pool[i][a] += 1)
end
end
counter = reduce(mergewith(+), pool)
bar = n ÷ k + 1
return [key for (key, v) in counter if v >= bar]
end
Complexity analysis for distributed version
We parallel the procedure with \(p\) processes or threads, and each sub array will have \(O(\frac{n}{p})\) elements. So the updating time is \(O(\frac{n}{p}k)\). In merge_candidates!
, the arguments X
and Y
may have \((k - 1)\) elements at most, so the cost for merging is \(O(k^2)\), which gives us \(O(k^2 \log p)\) for reducing in total.
The verification part is less than above, and we also ignore the cost analysis of latency and bandwidth, since they are too small in this case. Therefore the total cost is distributed general form majority voting algorithm is \[ O(\frac{nk}{p} + k^2\log p) \] The extra space is also \(O(k)\) in each node.
4. Benchmark
For a parallel computing benchmark, I started Julia in my laptop with julia -p 7 -t 8
.
julia> using BenchmarkTools
julia> nprocs()
8
julia> Threads.nthreads()
8
julia> A = vcat([repeat([i], i) for i = 1:10]...);
julia> A_big = repeat(A, 2_000_000);
julia> @btime y = majority_element($A_big, 11)
3.980 s (8 allocations: 896 bytes)
5-element Array{Int64,1}:
7
9
10
8
6
julia> @btime y = distributed_majority_element($A_big, 8, 11)
4.840 s (2019 allocations: 94.38 KiB)
5-element Array{Int64,1}:
7
9
10
8
6
julia> @btime y = parallel_majority_element($A_big, 8, 11)
1.215 s (180 allocations: 25.23 KiB)
5-element Array{Int64,1}:
7
9
10
8
6
We can see the multi-processing version is slower than the sequential version. The much more allocations (2019 vs 8) could be the cause.
For comparison, I also implement a sequential version majority voting algorithm in Python. The execute time for the same data above is 33.415s
, way behind Julia.
lang | exec mode | time (s) |
---|---|---|
python 3.8.5 | sequential | 33.415 |
julia 1.5.1 | sequential | 3.980 |
julia 1.5.1 | multi-processing | 4.840 |
julia 1.5.1 | multi-threading | 1.215 |
5. Summary
I present a general form of majority voting algorithm, i.e. each majority element appears more than \(1/k\) in a given array. The sequential algorithm has a time complexity of \(O(n k)\), and the distributed version costs \(O(\frac{nk}{p} + k^2\log p)\). They both use \(O(k)\) extra space locally. You can find the complete implementation in Julia in this pull request. And we can see from the benchmark experiment that Julia’s parallel computation is easy and fast.