Reduction

Reduction operation is widely used to gather values of some variable stored on different threads.

import java.io.IOException;
import org.pcj.*;

@RegisterStorage(PcjReduction.Shared.class)
public class PcjReduction implements StartPoint {

@Storage(PcjExample.class)
enum Shared { a }
public long a;

public static void main(String[] args) throws IOException {
    String nodesFile  = "nodes.txt";
    PCJ.executionBuilder (PcjExample.class)
                .addNodes(new File("nodes.txt"))
                .start();
}

@Override
public void main() throws Throwable {
    a = PCJ.myId() + 10;    // set value of a
    long sum = 0;
    
    if (PCJ.myId() == 0) {
      sum = PCJ.reduce(Lomg::sum, Shared.a);
   }
   PCJ.barrier();
   System.out.println(PCJ.myId()+ " "+ sum);
}
}

In the presented example the values are communicated to the thread with the id 0. Than reduction operation such as summation is performed. Than value of the variable a (Shared variable) stored at the thread p is communicated to the thread 0 and added to the local variable sum.

import java.io.IOException;
import org.pcj.*;

@RegisterStorage(PcjReductionGet.Shared.class)
public class PcjReductionGet implements StartPoint {
    
    @Storage(PcjReductionGet.class)
    enum Shared { a }
    long a; 

    public static void main(String[] args) throws IOException {
    String nodesFile  = "nodes.txt";
    PCJ.executionBuilder (PcjExample.class)
                .addNodes(new File("nodes.txt"))
                .start();
    }
    
    @Override
    public void main() throws Throwable {
        a = PCJ.myId() + 10;    // set value of a
        long sum = 0; 
        PCJ.barrier();
        if (PCJ.myId() == 0) {
            for (int p = 1; p < PCJ.threadCount(); p++) {
                sum = sum + (long) PCJ.get(p, Shared.a);
            }
        }
      PCJ.barrier();
      System.out.println(PCJ.myId()+ " "+ sum);
    }
}

The presented algorithm of the reduction is based on the synchronous communication PCJ.get(). The summation is performed at thread 0 as data is arrived.

The asynchronous version requires additional storage at the thread 0. The al[]variable stores values of a variable communicated in asynchronous way:

import java.io.IOException;
import org.pcj.*;

@RegisterStorage(PcjReductionGet.Shared.class)
public class PcjReductionGet implements StartPoint {

    @Storage(PcjReductionGet.class)
    enum Shared { a }
    long a;

    public static void main(String[] args) throws IOException {
    String nodesFile  = "nodes.txt";
    PCJ.executionBuilder (PcjExample.class)
                .addNodes(new File("nodes.txt"))
                .start();
    }

    @Override
    public void main() throws Throwable {
       PcjFuture aL[] = new PcjFuture[PCJ.threadCount()];
       PCJ.barrier();
       long sum;
       if (PCJ.myId() == 0) {         // Asynchronous communication
            for (int p = 0; p < PCJ.threadCount(); p++) {
                aL[p] = PCJ.asyncGet(p, Shared.a);
            }
       }
       PCJ.barrier();                 // Synchronization
       sum = 0;
       if (PCJ.myId() == 0) {         // Sumation of local values
            for (int p = 0; p < PCJ.threadCount(); p++) {
                sum = sum + (long) aL[p].get();
            }
       }
       System.out.println(PCJ.myId() + " " + sum);
    }
}

Last updated