Distributed Majority Voting Algorithm in Julia

1. Introduction

The original majority voting problem is to find the element that appears more than half times in a given array, and if there is no such majority element, algorithm should just return empty result.

There are some extended versions of this problem, like leetcode: Majority Element II. We can consider a general form: for an array of \(n\) elements and a given integer \(k \ge 2\), find all elements that appears more than \(n/k\) times in that array. And the original problem becomes the \(k = 2\) case.

In this post, I will give both sequential and distributed algorithm for this general form majority voting problem, along with complexity analysis. I will also use this algorithm as an example to explore how to write parallel computing program in Julia. It turns out to be a very pleasant journey.

2. Sequential Boyer-Moore algorithm

The brute force solution could be sorting the array, or counting appearances for all elements, like python’s collections.Counter. However, that would cost either \(O(n\log n)\) time complexity or extra \(O(n)\) space. The Boyer-Moore algorithm is the classic algorithm to solve this problem, which runs in \(O(n)\) time and \(O(1)\) extra space.

I will not deliberate the original algorithm since this post is about general \(1/k\) form and distributed scenario. The core insight is the same as \(k=2\) case. Those majorities can provide one element for each of those non-majority elements, and they will remain some after those “annihilation” if they are true majorities.

For a given \(k\), there are \(k - 1\) majority elements as most. We can use a hash map to record all candidates. See Julia code below for details.

function BoyerMoore(A::AbstractArray{T, 1}, k::Int=2)::Dict{T, Int} where T
    candidates = Dict{T, Int}()
    for a in A
        if length(candidates) < k - 1 || haskey(candidates, a)
            candidates[a] = get!(candidates, a, 0) + 1
        else
            to_del = Vector{T}()
            for key in keys(candidates)
                candidates[key] -= 1
                candidates[key] <= 0 && append!(to_del, key)
            end
            for key in to_del
                pop!(candidates, key)
            end
        end
    end
    return candidates
end


function majority_element(A::Vector{T}, k::Int=2)::Vector{T} where T
    @assert k >= 2 "k must be an integer no less than 2"

    candidates = BoyerMoore(A, k)
    for key in keys(candidates)
        candidates[key] = 0
    end
    for a in A
        haskey(candidates, a) && (candidates[a] += 1)
    end

    bar = div(length(A), k) + 1
    return [key for (key, v) in candidates if v >= bar]
end

The potential updating time for candidates in each look up is \(O(k)\), which adds up to \(O(n k)\) in total. The verification phase cost remains \(O(n)\). So the time complexity for the sequential general form majority voting algorithm is \(O(n k) + O(n) = O(n k)\). The extra space complexity is \(O(k)\).

3. Distributed majority voting algorithm

The majority element property can be easily extend to a distributed version. The candidates obtained from each sub array, can be viewed as a simplified “counter”, and we can merge them to get the global candidates.

I will give both multi-processing and multi-threading approaches in Julia.

The only tricky part is in the merge phase. When a distinctive element shows up, instead of decreasing one for all candidates’ count, we have to subtract the current minimum count (probably not the new distinctive ones). See merge_candidates! below.

Multi-processing

With the help of SharedArrays and @distributed macro, the parallel code in Julia is very neat.

@everywhere function merge_candidates!(X::Dict{T, Int}, Y::Dict{T, Int}, k::Int=2) where T
    for (key, v) in Y
        if length(X) < k - 1 || haskey(X, key)
            X[key] = get!(X, key, 0) + v
        else
            min_v = min(minimum(values(X)), v)
            to_del = Vector{T}()
            for a in keys(X)
                X[a] -= min_v
                X[a] <= 0 && append!(to_del, a)
            end
            for a in to_del
                pop!(X, a)
            end
            v > min_v && (X[key] = v - min_v)
        end
    end
    return X
end


function distributed_majority_element(A::Vector{T}, p::Int, k::Int=2)::Vector{T} where T
    @assert k >= 2 "k must be an integer no less than 2"

    n = length(A)
    step = n ÷ p

    A = SharedVector(A)
    candidates = @distributed merge_candidates! for i = 1:p
        left = (i - 1) * step + 1
        right = i == p ? n : i * step
        BoyerMoore(view(A, left:right), k)
    end

    global_counter = @distributed mergewith(+) for i = 1:p
        counter = Dict(key => 0 for (key, v) in candidates)
        left = (i - 1) * step + 1
        right = i == p ? n : i * step
        for a in view(A, left:right)
            haskey(counter, a) && (counter[a] += 1)
        end
        counter
    end

    bar = n ÷ k + 1
    return [key for (key, v) in global_counter if v >= bar]
end

Multi-Threading

In recent updated v1.5, multi-threading is no longer an experimental feature in Julia

function parallel_majority_element(A::Vector{T}, p::Int, k::Int=2)::Vector{T} where T
    @assert k >= 2 "k must be an integer no less than 2"

    n = length(A)
    step = n ÷ p

    pool = Vector{Dict{T, Int}}(undef, p)
    Threads.@threads for i = 1:p
        left = (i - 1) * step + 1
        right = i == p ? n : i * step
        pool[i] = BoyerMoore(view(A, left:right), k)
    end

    candidates = reduce(merge_candidates!, pool)

    Threads.@threads for i = 1:p
        pool[i] = Dict(key => 0 for (key, v) in candidates)
        left = (i - 1) * step + 1
        right = i == p ? n : i * step
        for a in view(A, left:right)
            haskey(pool[i], a) && (pool[i][a] += 1)
        end
    end
    counter = reduce(mergewith(+), pool)
    bar = n ÷ k + 1
    return [key for (key, v) in counter if v >= bar]
end

Complexity analysis for distributed version

We parallel the procedure with \(p\) processes or threads, and each sub array will have \(O(\frac{n}{p})\) elements. So the updating time is \(O(\frac{n}{p}k)\). In merge_candidates!, the arguments X and Y may have \((k - 1)\) elements at most, so the cost for merging is \(O(k^2)\), which gives us \(O(k^2 \log p)\) for reducing in total.

The verification part is less than above, and we also ignore the cost analysis of latency and bandwidth, since they are too small in this case. Therefore the total cost is distributed general form majority voting algorithm is \[ O(\frac{nk}{p} + k^2\log p) \] The extra space is also \(O(k)\) in each node.

4. Benchmark

For a parallel computing benchmark, I started Julia in my laptop with julia -p 7 -t 8.

julia> using BenchmarkTools

julia> nprocs()
8

julia> Threads.nthreads()
8

julia> A = vcat([repeat([i], i) for i = 1:10]...);

julia> A_big = repeat(A, 2_000_000);

julia> @btime y = majority_element($A_big, 11)
  3.980 s (8 allocations: 896 bytes)
5-element Array{Int64,1}:
  7
  9
 10
  8
  6

julia> @btime y = distributed_majority_element($A_big, 8, 11)
  4.840 s (2019 allocations: 94.38 KiB)
5-element Array{Int64,1}:
  7
  9
 10
  8
  6

julia> @btime y = parallel_majority_element($A_big, 8, 11)
  1.215 s (180 allocations: 25.23 KiB)
5-element Array{Int64,1}:
  7
  9
 10
  8
  6

We can see the multi-processing version is slower than the sequential version. The much more allocations (2019 vs 8) could be the cause.

For comparison, I also implement a sequential version majority voting algorithm in Python. The execute time for the same data above is 33.415s, way behind Julia.

lang exec mode time (s)
python 3.8.5 sequential 33.415
julia 1.5.1 sequential 3.980
julia 1.5.1 multi-processing 4.840
julia 1.5.1 multi-threading 1.215

5. Summary

I present a general form of majority voting algorithm, i.e. each majority element appears more than \(1/k\) in a given array. The sequential algorithm has a time complexity of \(O(n k)\), and the distributed version costs \(O(\frac{nk}{p} + k^2\log p)\). They both use \(O(k)\) extra space locally. You can find the complete implementation in Julia in this pull request. And we can see from the benchmark experiment that Julia’s parallel computation is easy and fast.

F. Shen
F. Shen
Algorithm Engineer

Be an informed citizen, life hacker, and sincere creator.

comments powered by Disqus
Next
Previous