Reduction operation is widely used to gather values of some variable stored on different threads.
importjava.io.IOException;importorg.pcj.*;@RegisterStorage(PcjReduction.Shared.class)publicclassPcjReductionimplementsStartPoint {@Storage(PcjExample.class)enumShared { a }publiclong a;publicstaticvoidmain(String[] args) throwsIOException {String nodesFile ="nodes.txt";PCJ.executionBuilder (PcjExample.class).addNodes(newFile("nodes.txt")).start();}@Overridepublicvoidmain() throwsThrowable { a =PCJ.myId() +10; // set value of along sum =0;if (PCJ.myId() ==0) { sum =PCJ.reduce(Lomg::sum,Shared.a); }PCJ.barrier();System.out.println(PCJ.myId()+" "+ sum);}}
In the presented example the values are communicated to the thread with the id 0. Than reduction operation such as summation is performed. Than value of the variable a (Shared variable) stored at the thread p is communicated to the thread 0 and added to the local variable sum.
importjava.io.IOException;importorg.pcj.*;@RegisterStorage(PcjReductionGet.Shared.class)publicclassPcjReductionGetimplementsStartPoint { @Storage(PcjReductionGet.class)enumShared { a }long a; publicstaticvoidmain(String[] args) throwsIOException {String nodesFile ="nodes.txt";PCJ.executionBuilder (PcjExample.class).addNodes(newFile("nodes.txt")).start(); } @Overridepublicvoidmain() throwsThrowable { a =PCJ.myId() +10; // set value of along sum =0; PCJ.barrier();if (PCJ.myId() ==0) {for (int p =1; p <PCJ.threadCount(); p++) { sum = sum + (long) PCJ.get(p,Shared.a); } }PCJ.barrier();System.out.println(PCJ.myId()+" "+ sum); }}
The presented algorithm of the reduction is based on the synchronous communication PCJ.get(). The summation is performed at thread 0 as data is arrived.
The asynchronous version requires additional storage at the thread 0. The al[]variable stores values of a variable communicated in asynchronous way:
importjava.io.IOException;importorg.pcj.*;@RegisterStorage(PcjReductionGet.Shared.class)publicclassPcjReductionGetimplementsStartPoint { @Storage(PcjReductionGet.class)enumShared { a }long a;publicstaticvoidmain(String[] args) throwsIOException {String nodesFile ="nodes.txt";PCJ.executionBuilder (PcjExample.class).addNodes(newFile("nodes.txt")).start(); } @Overridepublicvoidmain() throwsThrowable {PcjFuture aL[] =newPcjFuture[PCJ.threadCount()];PCJ.barrier();long sum;if (PCJ.myId() ==0) { // Asynchronous communicationfor (int p =0; p <PCJ.threadCount(); p++) { aL[p] =PCJ.asyncGet(p,Shared.a); } }PCJ.barrier(); // Synchronization sum =0;if (PCJ.myId() ==0) { // Sumation of local valuesfor (int p =0; p <PCJ.threadCount(); p++) { sum = sum + (long) aL[p].get(); } }System.out.println(PCJ.myId() +" "+ sum); }}