Reduction operation is widely used to gather values of some variable stored on different threads.
import java.io.IOException;
import org.pcj.*;
@RegisterStorage(PcjReduction.Shared.class)
public class PcjReduction implements StartPoint {
@Storage(PcjExample.class)
enum Shared { a }
public long a;
public static void main(String[] args) throws IOException {
String nodesFile = "nodes.txt";
PCJ.executionBuilder (PcjExample.class)
.addNodes(new File("nodes.txt"))
.start();
}
@Override
public void main() throws Throwable {
a = PCJ.myId() + 10; // set value of a
long sum = 0;
if (PCJ.myId() == 0) {
sum = PCJ.reduce(Lomg::sum, Shared.a);
}
PCJ.barrier();
System.out.println(PCJ.myId()+ " "+ sum);
}
}
In the presented example the values are communicated to the thread with the id 0. Than reduction operation such as summation is performed. Than value of the variable a (Shared variable) stored at the thread p is communicated to the thread 0 and added to the local variable sum.
import java.io.IOException;
import org.pcj.*;
@RegisterStorage(PcjReductionGet.Shared.class)
public class PcjReductionGet implements StartPoint {
@Storage(PcjReductionGet.class)
enum Shared { a }
long a;
public static void main(String[] args) throws IOException {
String nodesFile = "nodes.txt";
PCJ.executionBuilder (PcjExample.class)
.addNodes(new File("nodes.txt"))
.start();
}
@Override
public void main() throws Throwable {
a = PCJ.myId() + 10; // set value of a
long sum = 0;
PCJ.barrier();
if (PCJ.myId() == 0) {
for (int p = 1; p < PCJ.threadCount(); p++) {
sum = sum + (long) PCJ.get(p, Shared.a);
}
}
PCJ.barrier();
System.out.println(PCJ.myId()+ " "+ sum);
}
}
The presented algorithm of the reduction is based on the synchronous communication PCJ.get(). The summation is performed at thread 0 as data is arrived.
The asynchronous version requires additional storage at the thread 0. The al[]variable stores values of a variable communicated in asynchronous way:
import java.io.IOException;
import org.pcj.*;
@RegisterStorage(PcjReductionGet.Shared.class)
public class PcjReductionGet implements StartPoint {
@Storage(PcjReductionGet.class)
enum Shared { a }
long a;
public static void main(String[] args) throws IOException {
String nodesFile = "nodes.txt";
PCJ.executionBuilder (PcjExample.class)
.addNodes(new File("nodes.txt"))
.start();
}
@Override
public void main() throws Throwable {
PcjFuture aL[] = new PcjFuture[PCJ.threadCount()];
PCJ.barrier();
long sum;
if (PCJ.myId() == 0) { // Asynchronous communication
for (int p = 0; p < PCJ.threadCount(); p++) {
aL[p] = PCJ.asyncGet(p, Shared.a);
}
}
PCJ.barrier(); // Synchronization
sum = 0;
if (PCJ.myId() == 0) { // Sumation of local values
for (int p = 0; p < PCJ.threadCount(); p++) {
sum = sum + (long) aL[p].get();
}
}
System.out.println(PCJ.myId() + " " + sum);
}
}