private static AtomicLong atomicLong = new AtomicLong(0);
private static AtomicBoolean isTaskLastRun = new AtomicBoolean(false) ;
/**
* Asynchronous process by ForkJoin FrameWork in JDK1.7
* @param <T>
* @param <E>
*/
private static class RecursiveConcurrent<T extends Function ,E ,V > extends RecursiveTask<E> {
private static final long serialVersionUID = 5232443952276485270L;
private final int seqThreshHold = NUMBER_OF_CORES > 1
? NUMBER_OF_CORES * 500 > 10000
? 2000 : NUMBER_OF_CORES * 1000 : 2000 ;// threshold level must between 100 / 10000 in fork join parallel frameWork
private static final int MAX_CAP = 0x7fff; // 32767 max #workers - 1
private ForkJoinPool fjp = new ForkJoinPool(Math.min(MAX_CAP, NUMBER_OF_CORES));
private E[] data;
private List<V> collection;
private T func ;
private int start , end ;
private Observer<AtomicLong> observed;
RecursiveConcurrent(E[] data, T action, int start, int end, Observer<AtomicLong> observed) {
this.data = data;
this.func = action;
this.start = start;
this.end = end;
this.observed = observed;
}
RecursiveConcurrent(List<V> data, T action, int start, int end, Observer<AtomicLong> observed) {
this.collection = data;
this.func = action;
this.start = start;
this.end = end;
this.observed = observed;
}
@SuppressWarnings("unchecked")
@Override
protected E compute() {
//double sum =0;
if (this.collection == null && this.data != null && this.data.length > 0) {
if ((end - start) < seqThreshHold) {
for (int i = start; i < end; i++) {
data[i] = (E) func.apply(data[i]);
atomicLong.incrementAndGet();
}
observed.update(atomicLong);
} else {
int middle = (start + end) / 2 ; // Divide and Conquer Strategy in ForkJoin FrameWork
//----Synchronized
//invokeAll(new RecursiveConcurrent<>(data,func,start,middle),
// new RecursiveConcurrent<>(data,func,middle,end));
//----ASynchronized
RecursiveConcurrent t1 = new RecursiveConcurrent<Function<Double,Double>,Object ,Object>(data,func,start,middle, observed);
RecursiveConcurrent t2 = new RecursiveConcurrent<Function<Double,Double>,Object ,Object>(data,func,middle,end, observed);
t1.fork();
t2.fork();
//sum = (double)t1.join() + (double)t2.join();
}
}
return null;
}
private void execute(RecursiveConcurrent recurSiveCon) {
//RecursiveConcurrent recurSiveCon = new RecursiveConcurrent(data,func,start,end,observed);
//fjp.invoke(recurSiveCon);//Synchronized
fjp.execute(recurSiveCon);//ASynchronized
}
private boolean cancel() {
if(!fjp.isShutdown())
fjp.shutdown();
return fjp.isShutdown();
}
}
public static class ActionTask {
private static RecursiveConcurrent<Function<Double,Double>,Double ,Object> rec;
private Observer<AtomicLong> observed;
private Function<Double,Double> myFunction = Math::sqrt ;
private int mainForIndex ;
private static Double[] data;
public ActionTask(Observer<AtomicLong> observed, int mainForIndex) {
this.observed = observed;
this.mainForIndex = mainForIndex;
}
public static boolean cancel() {
if (rec.cancel()) {
atomicLong.set(0);
isTaskLastRun.set(false);
}
return rec.isDone();
}
public void execute() {
data = new Double[mainForIndex];
for (double i = 0; i < data.length; i++) {
data[(int) i] = i;
}
if (!isTaskLastRun.get()) {
isTaskLastRun.set(true);
rec = new RecursiveConcurrent<Function<Double,Double>,Double ,Object>(data, myFunction, 0, data.length, observed);
rec.execute(rec);
}
}
}
public void registerTask(View v) {
HandleThreadService.Observer<AtomicLong> observer = this::upDateUiThread;
new HandleThreadService.ActionTask(observer,5000000).e xecute();
}
private void upDateUiThread(AtomicLong atomicLong) {
runOnUiThread(() -> {
long aa = atomicLong.get();
if(aa > 0)
tvDate.setText("counter : "+aa);
if (aa >= 4999999) {
if(HandleThreadService.ActionTask.cancel())
tvDate.setText("Compelet.!");
}
});
}
@FunctionalInterface
public interface Observer<T> {
void update(T value);
}