Reduction

Reduction operation is widely used to gather values of some variable stored on different threads.

In the presented example the values are communicated to the thread with the id 0. Than reduction operation such as summation is performed. Than value of the variable a (Shared variable) stored at the thread p is communicated to the thread 0 and added to the local variable sum.

import java.io.IOException;
import org.pcj.*;
@RegisterStorage(PcjReductionGet.Shared.class)
public class PcjReductionGet implements StartPoint {
@Storage(PcjReductionGet.class)
enum Shared { a }
long a;
public static void main(String[] args) throws IOException {
String nodesFile = "nodes.txt";
PCJ.executionBuilder (PcjExample.class)
.addNodes(new File("nodes.txt"))
.start();
}
@Override
public void main() throws Throwable {
a = PCJ.myId() + 10; // set value of a
long sum = 0;
PCJ.barrier();
if (PCJ.myId() == 0) {
for (int p = 1; p < PCJ.threadCount(); p++) {
sum = sum + (long) PCJ.get(p, Shared.a);
}
}
PCJ.barrier();
System.out.println(PCJ.myId()+ " "+ sum);
}
}

The presented algorithm of the reduction is based on the synchronous communication PCJ.get(). The summation is performed at thread 0 as data is arrived.

The asynchronous version requires additional storage at the thread 0. The al[]variable stores values of a variable communicated in asynchronous way:

import java.io.IOException;
import org.pcj.*;
@RegisterStorage(PcjReductionGet.Shared.class)
public class PcjReductionGet implements StartPoint {
@Storage(PcjReductionGet.class)
enum Shared { a }
long a;
public static void main(String[] args) throws IOException {
String nodesFile = "nodes.txt";
PCJ.executionBuilder (PcjExample.class)
.addNodes(new File("nodes.txt"))
.start();
}
@Override
public void main() throws Throwable {
PcjFuture aL[] = new PcjFuture[PCJ.threadCount()];
PCJ.barrier();
long sum;
if (PCJ.myId() == 0) { // Asynchronous communication
for (int p = 0; p < PCJ.threadCount(); p++) {
aL[p] = PCJ.asyncGet(p, Shared.a);
}
}
PCJ.barrier(); // Synchronization
sum = 0;
if (PCJ.myId() == 0) { // Sumation of local values
for (int p = 0; p < PCJ.threadCount(); p++) {
sum = sum + (long) aL[p].get();
}
}
System.out.println(PCJ.myId() + " " + sum);
}
}