
Reduction operation is widely used to gather values of some variable stored on different threads.

import org.pcj.*;

public class PcjReduction implements StartPoint {

enum Shared { a }
public long a;

public static void main(String[] args) throws IOException {
    String nodesFile  = "nodes.txt";
    PCJ.executionBuilder (PcjExample.class)
                .addNodes(new File("nodes.txt"))

public void main() throws Throwable {
    a = PCJ.myId() + 10;    // set value of a
    long sum = 0;
    if (PCJ.myId() == 0) {
      sum = PCJ.reduce(Lomg::sum, Shared.a);
   System.out.println(PCJ.myId()+ " "+ sum);

In the presented example the values are communicated to the thread with the id 0. Than reduction operation such as summation is performed. Than value of the variable a (Shared variable) stored at the thread p is communicated to the thread 0 and added to the local variable sum.

import org.pcj.*;

public class PcjReductionGet implements StartPoint {
    enum Shared { a }
    long a; 

    public static void main(String[] args) throws IOException {
    String nodesFile  = "nodes.txt";
    PCJ.executionBuilder (PcjExample.class)
                .addNodes(new File("nodes.txt"))
    public void main() throws Throwable {
        a = PCJ.myId() + 10;    // set value of a
        long sum = 0; 
        if (PCJ.myId() == 0) {
            for (int p = 1; p < PCJ.threadCount(); p++) {
                sum = sum + (long) PCJ.get(p, Shared.a);
      System.out.println(PCJ.myId()+ " "+ sum);

The presented algorithm of the reduction is based on the synchronous communication PCJ.get(). The summation is performed at thread 0 as data is arrived.

The asynchronous version requires additional storage at the thread 0. The al[]variable stores values of a variable communicated in asynchronous way:

import org.pcj.*;

public class PcjReductionGet implements StartPoint {

    enum Shared { a }
    long a;

    public static void main(String[] args) throws IOException {
    String nodesFile  = "nodes.txt";
    PCJ.executionBuilder (PcjExample.class)
                .addNodes(new File("nodes.txt"))

    public void main() throws Throwable {
       PcjFuture aL[] = new PcjFuture[PCJ.threadCount()];
       long sum;
       if (PCJ.myId() == 0) {         // Asynchronous communication
            for (int p = 0; p < PCJ.threadCount(); p++) {
                aL[p] = PCJ.asyncGet(p, Shared.a);
       PCJ.barrier();                 // Synchronization
       sum = 0;
       if (PCJ.myId() == 0) {         // Sumation of local values
            for (int p = 0; p < PCJ.threadCount(); p++) {
                sum = sum + (long) aL[p].get();
       System.out.println(PCJ.myId() + " " + sum);

Last updated

Was this helpful?