Close

Java Concurrency - CountedCompleter Examples

Java Concurrency Java 


CountedCompleter is a part of Java Fork/join framework. It's another subclass of ForkJoinTask.

CountedCompleter remembers the pending task count (just count, nothing else) and can notify the tasks implementation onCompletion method.

This pending count is increased on each call of CountedCompleter#addToPendingCount() method by client code. We have to use this method for each new task forking.

The method CountedCompleter#tryComplete() should be called within compute method only once when returning. This decreases the pending count.

CountedCompleter can optionally return a computed value. We have to override method getRawResult() to return value.


In this example the first task splits itself is sub-tasks using a while loop. A value is also returned from the task.

package com.logicbig.example;

import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountedCompleter;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicReference;

public class CountedCompleterExample3 {

public static void main (String[] args) {
List<BigInteger> list = new ArrayList<>();
for (int i = 3; i < 20; i++) {
list.add(new BigInteger(Integer.toString(i)));
}

BigInteger sum = ForkJoinPool.commonPool().
invoke(new FactorialTask(null,
new AtomicReference<>(new BigInteger("0")),
list));
System.out.println("Sum of the factorials = " + sum);
}


private static class FactorialTask extends CountedCompleter<BigInteger> {
private static int SEQUENTIAL_THRESHOLD = 5;
private List<BigInteger> integerList;
private AtomicReference<BigInteger> result;

private FactorialTask (CountedCompleter<BigInteger> parent,
AtomicReference<BigInteger> result,
List<BigInteger> integerList) {
super(parent);
this.integerList = integerList;
this.result = result;
}

@Override
public BigInteger getRawResult () {
return result.get();
}

@Override
public void compute () {

//this example creates all sub-tasks in this while loop
while (integerList.size() > SEQUENTIAL_THRESHOLD) {

//end of the list containing SEQUENTIAL_THRESHOLD items.
List<BigInteger> newTaskList = integerList.subList(integerList.size() -
SEQUENTIAL_THRESHOLD, integerList.size());

//remaining list
integerList = integerList.subList(0, integerList.size() -
SEQUENTIAL_THRESHOLD);

addToPendingCount(1);
FactorialTask task = new FactorialTask(this, result, newTaskList);
task.fork();
}
//find sum of factorials of the remaining this.integerList
sumFactorials();
propagateCompletion();
}


private void addFactorialToResult (BigInteger factorial) {
result.getAndAccumulate(factorial, (b1, b2) -> b1.add(b2));
}

private void sumFactorials () {

for (BigInteger i : integerList) {
addFactorialToResult(CalcUtil.calculateFactorial(i));
}
}
}
}

Output

Sum of the factorials = 128425485935180310
Original Post




In this example each task forks two tasks.

package com.logicbig.example;

import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountedCompleter;
import java.util.concurrent.ForkJoinPool;

public class CountedCompleterExample {

public static void main (String[] args) {
List<BigInteger> list = new ArrayList<>();
for (int i = 3; i < 20; i++) {
list.add(new BigInteger(Integer.toString(i)));
}
ForkJoinPool.commonPool().invoke(
new FactorialTask(null, list));

}

private static class FactorialTask extends CountedCompleter<Void> {

private static int SEQUENTIAL_THRESHOLD = 5;
private List<BigInteger> integerList;
private int numberCalculated;

private FactorialTask (CountedCompleter<Void> parent,
List<BigInteger> integerList) {
super(parent);
this.integerList = integerList;
}


@Override
public void compute () {
if (integerList.size() <= SEQUENTIAL_THRESHOLD) {
showFactorial();
} else {
int middle = integerList.size() / 2;
List<BigInteger> rightList = integerList.subList(middle,
integerList.size());
List<BigInteger> leftList = integerList.subList(0, middle);
addToPendingCount(2);
FactorialTask taskRight = new FactorialTask(this, rightList);
FactorialTask taskLeft = new FactorialTask(this, leftList);
taskLeft.fork();
taskRight.fork();
}
tryComplete();
}

@Override
public void onCompletion (CountedCompleter<?> caller) {
if (caller == this) {
System.out.printf("completed thread : %s numberCalculated=%s%n", Thread
.currentThread().getName(), numberCalculated);
}
}

private void showFactorial () {

for (BigInteger i : integerList) {
BigInteger factorial = CalcUtil.calculateFactorial(i);
System.out.printf("%s! = %s, thread = %s%n", i, factorial, Thread
.currentThread().getName());
numberCalculated++;
}
}
}
}

Output

3! = 6, thread = ForkJoinPool.commonPool-worker-3
11! = 39916800, thread = ForkJoinPool.commonPool-worker-2
15! = 1307674368000, thread = com.logicbig.example.CountedCompleterExample.main()
7! = 5040, thread = ForkJoinPool.commonPool-worker-1
16! = 20922789888000, thread = com.logicbig.example.CountedCompleterExample.main()
12! = 479001600, thread = ForkJoinPool.commonPool-worker-2
4! = 24, thread = ForkJoinPool.commonPool-worker-3
13! = 6227020800, thread = ForkJoinPool.commonPool-worker-2
17! = 355687428096000, thread = com.logicbig.example.CountedCompleterExample.main()
8! = 40320, thread = ForkJoinPool.commonPool-worker-1
18! = 6402373705728000, thread = com.logicbig.example.CountedCompleterExample.main()
14! = 87178291200, thread = ForkJoinPool.commonPool-worker-2
5! = 120, thread = ForkJoinPool.commonPool-worker-3
completed thread : ForkJoinPool.commonPool-worker-2 numberCalculated=4
19! = 121645100408832000, thread = com.logicbig.example.CountedCompleterExample.main()
9! = 362880, thread = ForkJoinPool.commonPool-worker-1
completed thread : com.logicbig.example.CountedCompleterExample.main() numberCalculated=5
6! = 720, thread = ForkJoinPool.commonPool-worker-3
10! = 3628800, thread = ForkJoinPool.commonPool-worker-1
completed thread : ForkJoinPool.commonPool-worker-3 numberCalculated=4
completed thread : ForkJoinPool.commonPool-worker-1 numberCalculated=4
Original Post




This example is the improved version of the last one where only one task is fork from this task. This also return a value.

package com.logicbig.example;

import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountedCompleter;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicReference;

public class CountedCompleterExample2 {

public static void main (String[] args) {
List<BigInteger> list = new ArrayList<>();
for (int i = 3; i < 20; i++) {
list.add(new BigInteger(Integer.toString(i)));
}

BigInteger sum = ForkJoinPool.commonPool().invoke(new FactorialTask(null,
new AtomicReference<>(new BigInteger("0")),
list));
System.out.println("Sum of the factorials = " + sum);
}


private static class FactorialTask extends CountedCompleter<BigInteger> {
private static int SEQUENTIAL_THRESHOLD = 5;
private List<BigInteger> integerList;
private AtomicReference<BigInteger> result;

private FactorialTask (CountedCompleter<BigInteger> parent,
AtomicReference<BigInteger> result,
List<BigInteger> integerList) {
super(parent);
this.integerList = integerList;
this.result = result;
}

@Override
public BigInteger getRawResult () {
return result.get();
}

public void compute () {
if (integerList.size() <= SEQUENTIAL_THRESHOLD) {
sumFactorials();
propagateCompletion();
} else {
int middle = integerList.size() / 2;
List<BigInteger> newList = integerList.subList(middle, integerList.size());
integerList = integerList.subList(0, middle);
addToPendingCount(1);
FactorialTask task = new FactorialTask(this, result, newList);
task.fork();
this.compute();
}
}

private void addFactorialToResult (BigInteger factorial) {
result.getAndAccumulate(factorial, (b1, b2) -> b1.add(b2));
}


private void sumFactorials () {
for (BigInteger i : integerList) {
addFactorialToResult(CalcUtil.calculateFactorial(i));
}
}
}
}

Output

Sum of the factorials = 128425485935180310
Original Post




See Also