Distributed Majority Voting Algorithm in Julia

1. Introduction

The original majority voting problem is to find the element that appears more than half times in a given array, and if there is no such majority element, algorithm should just return empty result.

There are some extended versions of this problem, like leetcode: Majority Element II. We can consider a general form: for an array of \(n\) elements and a given integer \(k \ge 2\), find all elements that appears more than \(n/k\) times in that array. And the original problem becomes the \(k = 2\) case.

In this post, I will give both sequential and distributed algorithm for this general form majority voting problem, along with complexity analysis. I will also use this algorithm as an example to explore how to write parallel computing program in Julia. It turns out to be a very pleasant journey.

2. Sequential Boyer-Moore algorithm

The brute force solution could be sorting the array, or counting appearances for all elements, like python’s collections.Counter. However, that would cost either \(O(n\log n)\) time complexity or extra \(O(n)\) space. The Boyer-Moore algorithm is the classic algorithm to solve this problem, which runs in \(O(n)\) time and \(O(1)\) extra space.

I will not deliberate the original algorithm since this post is about general \(1/k\) form and distributed scenario. The core insight is the same as \(k=2\) case. Those majorities can provide one element for each of those non-majority elements, and they will remain some after those “annihilation” if they are true majorities.

For a given \(k\), there are \(k - 1\) majority elements as most. We can use a hash map to record all candidates. See Julia code below for details.

function BoyerMoore(A::AbstractArray{T, 1}, k::Int=2)::Dict{T, Int} where T
    candidates = Dict{T, Int}()
    for a in A
        if length(candidates) < k - 1 || haskey(candidates, a)
            candidates[a] = get!(candidates, a, 0) + 1
            to_del = Vector{T}()
            for key in keys(candidates)
                candidates[key] -= 1
                candidates[key] <= 0 && append!(to_del, key)
            for key in to_del
                pop!(candidates, key)
    return candidates

function majority_element(A::Vector{T}, k::Int=2)::Vector{T} where T
    @assert k >= 2 "k must be an integer no less than 2"

    candidates = BoyerMoore(A, k)
    for key in keys(candidates)
        candidates[key] = 0
    for a in A
        haskey(candidates, a) && (candidates[a] += 1)

    bar = div(length(A), k) + 1
    return [key for (key, v) in candidates if v >= bar]

The potential updating time for candidates in each look up is \(O(k)\), which adds up to \(O(n k)\) in total. The verification phase cost remains \(O(n)\). So the time complexity for the sequential general form majority voting algorithm is \(O(n k) + O(n) = O(n k)\). The extra space complexity is \(O(k)\).

3. Distributed majority voting algorithm

The majority element property can be easily extend to a distributed version. The candidates obtained from each sub array, can be viewed as a simplified “counter”, and we can merge them to get the global candidates.

I will give both multi-processing and multi-threading approaches in Julia.

The only tricky part is in the merge phase. When a distinctive element shows up, instead of decreasing one for all candidates’ count, we have to subtract the current minimum count (probably not the new distinctive ones). See merge_candidates! below.


With the help of SharedArrays and @distributed macro, the parallel code in Julia is very neat.

@everywhere function merge_candidates!(X::Dict{T, Int}, Y::Dict{T, Int}, k::Int=2) where T
    for (key, v) in Y
        if length(X) < k - 1 || haskey(X, key)
            X[key] = get!(X, key, 0) + v
            min_v = min(minimum(values(X)), v)
            to_del = Vector{T}()
            for a in keys(X)
                X[a] -= min_v
                X[a] <= 0 && append!(to_del, a)
            for a in to_del
                pop!(X, a)
            v > min_v && (X[key] = v - min_v)
    return X

function distributed_majority_element(A::Vector{T}, p::Int, k::Int=2)::Vector{T} where T
    @assert k >= 2 "k must be an integer no less than 2"

    n = length(A)
    step = n ÷ p

    A = SharedVector(A)
    candidates = @distributed merge_candidates! for i = 1:p
        left = (i - 1) * step + 1
        right = i == p ? n : i * step
        BoyerMoore(view(A, left:right), k)

    global_counter = @distributed mergewith(+) for i = 1:p
        counter = Dict(key => 0 for (key, v) in candidates)
        left = (i - 1) * step + 1
        right = i == p ? n : i * step
        for a in view(A, left:right)
            haskey(counter, a) && (counter[a] += 1)

    bar = n ÷ k + 1
    return [key for (key, v) in global_counter if v >= bar]


In recent updated v1.5, multi-threading is no longer an experimental feature in Julia

function parallel_majority_element(A::Vector{T}, p::Int, k::Int=2)::Vector{T} where T
    @assert k >= 2 "k must be an integer no less than 2"

    n = length(A)
    step = n ÷ p

    pool = Vector{Dict{T, Int}}(undef, p)
    Threads.@threads for i = 1:p
        left = (i - 1) * step + 1
        right = i == p ? n : i * step
        pool[i] = BoyerMoore(view(A, left:right), k)

    candidates = reduce(merge_candidates!, pool)

    Threads.@threads for i = 1:p
        pool[i] = Dict(key => 0 for (key, v) in candidates)
        left = (i - 1) * step + 1
        right = i == p ? n : i * step
        for a in view(A, left:right)
            haskey(pool[i], a) && (pool[i][a] += 1)
    counter = reduce(mergewith(+), pool)
    bar = n ÷ k + 1
    return [key for (key, v) in counter if v >= bar]

Complexity analysis for distributed version

We parallel the procedure with \(p\) processes or threads, and each sub array will have \(O(\frac{n}{p})\) elements. So the updating time is \(O(\frac{n}{p}k)\). In merge_candidates!, the arguments X and Y may have \((k - 1)\) elements at most, so the cost for merging is \(O(k^2)\), which gives us \(O(k^2 \log p)\) for reducing in total.

The verification part is less than above, and we also ignore the cost analysis of latency and bandwidth, since they are too small in this case. Therefore the total cost is distributed general form majority voting algorithm is \[ O(\frac{nk}{p} + k^2\log p) \] The extra space is also \(O(k)\) in each node.

4. Benchmark

For a parallel computing benchmark, I started Julia in my laptop with julia -p 7 -t 8.

julia> using BenchmarkTools

julia> nprocs()

julia> Threads.nthreads()

julia> A = vcat([repeat([i], i) for i = 1:10]...);

julia> A_big = repeat(A, 2_000_000);

julia> @btime y = majority_element($A_big, 11)
  3.980 s (8 allocations: 896 bytes)
5-element Array{Int64,1}:

julia> @btime y = distributed_majority_element($A_big, 8, 11)
  4.840 s (2019 allocations: 94.38 KiB)
5-element Array{Int64,1}:

julia> @btime y = parallel_majority_element($A_big, 8, 11)
  1.215 s (180 allocations: 25.23 KiB)
5-element Array{Int64,1}:

We can see the multi-processing version is slower than the sequential version. The much more allocations (2019 vs 8) could be the cause.

For comparison, I also implement a sequential version majority voting algorithm in Python. The execute time for the same data above is 33.415s, way behind Julia.

lang exec mode time (s)
python 3.8.5 sequential 33.415
julia 1.5.1 sequential 3.980
julia 1.5.1 multi-processing 4.840
julia 1.5.1 multi-threading 1.215

5. Summary

I present a general form of majority voting algorithm, i.e. each majority element appears more than \(1/k\) in a given array. The sequential algorithm has a time complexity of \(O(n k)\), and the distributed version costs \(O(\frac{nk}{p} + k^2\log p)\). They both use \(O(k)\) extra space locally. You can find the complete implementation in Julia in this pull request. And we can see from the benchmark experiment that Julia’s parallel computation is easy and fast.

